TCPTunnel.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <oasys/io/NetUtils.h>
00019 #include <oasys/util/Time.h>
00020 #include "DTNTunnel.h"
00021 #include "TCPTunnel.h"
00022 
00023 namespace dtntunnel {
00024 
00025 //----------------------------------------------------------------------
00026 TCPTunnel::TCPTunnel()
00027     : IPTunnel("TCPTunnel", "/dtntunnel/tcp")
00028 {
00029 }
00030 
00031 //----------------------------------------------------------------------
00032 void
00033 TCPTunnel::add_listener(in_addr_t listen_addr, u_int16_t listen_port,
00034                         in_addr_t remote_addr, u_int16_t remote_port)
00035 {
00036     new Listener(this, listen_addr, listen_port,
00037                  remote_addr, remote_port);
00038 }
00039 
00040 //----------------------------------------------------------------------
00041 void
00042 TCPTunnel::new_connection(Connection* c)
00043 {
00044     oasys::ScopeLock l(&lock_, "TCPTunnel::new_connection");
00045     
00046     ConnTable::iterator i;
00047     ConnKey key(c->client_addr_,
00048                 c->client_port_,
00049                 c->remote_addr_,
00050                 c->remote_port_);
00051     
00052     i = connections_.find(key);
00053     
00054     if (i != connections_.end()) {
00055         log_err("got duplicate connection %s:%d -> %s:%d",
00056                 intoa(c->client_addr_),
00057                 c->client_port_,
00058                 intoa(c->remote_addr_),
00059                 c->remote_port_);
00060         return;
00061     }
00062 
00063     connections_.insert(ConnTable::value_type(key, c));
00064 }
00065 
00066 //----------------------------------------------------------------------
00067 void
00068 TCPTunnel::kill_connection(Connection* c)
00069 {
00070     oasys::ScopeLock l(&lock_, "TCPTunnel::kill_connection");
00071     
00072     ConnTable::iterator i;
00073     ConnKey key(c->client_addr_,
00074                 c->client_port_,
00075                 c->remote_addr_,
00076                 c->remote_port_);
00077     
00078     i = connections_.find(key);
00079 
00080     if (i == connections_.end()) {
00081         log_err("can't find connection to kill %s:%d -> %s:%d",
00082                 intoa(c->client_addr_),
00083                 c->client_port_,
00084                 intoa(c->remote_addr_),
00085                 c->remote_port_);
00086         return;
00087     }
00088 
00089     connections_.erase(i);
00090 }
00091 
00092 //----------------------------------------------------------------------
00093 void
00094 TCPTunnel::handle_bundle(dtn::APIBundle* bundle)
00095 {
00096     oasys::ScopeLock l(&lock_, "TCPTunnel::handle_bundle");
00097 
00098     log_debug("handle_bundle got %zu byte bundle", bundle->payload_.len());
00099     
00100     DTNTunnel::BundleHeader hdr;
00101     memcpy(&hdr, bundle->payload_.buf(), sizeof(hdr));
00102     hdr.client_port_ = htons(hdr.client_port_);
00103     hdr.remote_port_ = htons(hdr.remote_port_);
00104 
00105     Connection* conn;
00106     ConnTable::iterator i;
00107     ConnKey key(hdr.client_addr_,
00108                 hdr.client_port_,
00109                 hdr.remote_addr_,
00110                 hdr.remote_port_);
00111     
00112     i = connections_.find(key);
00113 
00114     if (i == connections_.end()) {
00115         log_info("new connection %s:%d -> %s:%d",
00116                  intoa(hdr.client_addr_),
00117                  hdr.client_port_,
00118                  intoa(hdr.remote_addr_),
00119                  hdr.remote_port_);
00120         
00121         conn = new Connection(this, &bundle->spec_.source,
00122                               hdr.client_addr_, hdr.client_port_,
00123                               hdr.remote_addr_, hdr.remote_port_);
00124         conn->start();
00125         connections_.insert(ConnTable::value_type(key, conn));
00126 
00127     } else {
00128         conn = i->second;
00129         ASSERT(conn != NULL);
00130     }
00131 
00132     conn->handle_bundle(bundle);
00133     return;
00134 }
00135 
00136 //----------------------------------------------------------------------
00137 TCPTunnel::Listener::Listener(TCPTunnel* t,
00138                               in_addr_t listen_addr, u_int16_t listen_port,
00139                               in_addr_t remote_addr, u_int16_t remote_port)
00140     : TCPServerThread("TCPTunnel::Listener",
00141                       "/dtntunnel/tcp/listener",
00142                       Thread::DELETE_ON_EXIT),
00143       tcptun_(t),
00144       listen_addr_(listen_addr),
00145       listen_port_(listen_port),
00146       remote_addr_(remote_addr),
00147       remote_port_(remote_port)
00148 {
00149     bind_listen_start(listen_addr, listen_port);
00150 }
00151 
00152 //----------------------------------------------------------------------
00153 void
00154 TCPTunnel::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00155 {
00156     Connection* c = new Connection(tcptun_, DTNTunnel::instance()->dest_eid(),
00157                                    fd, addr, port, remote_addr_, remote_port_);
00158     tcptun_->new_connection(c);
00159     c->start();
00160 }
00161 
00162 //----------------------------------------------------------------------
00163 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00164                                   in_addr_t client_addr, u_int16_t client_port,
00165                                   in_addr_t remote_addr, u_int16_t remote_port)
00166     : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00167       Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00168       tcptun_(t),
00169       sock_(logpath_),
00170       queue_(logpath_),
00171       next_seqno_(0),
00172       client_addr_(client_addr),
00173       client_port_(client_port),
00174       remote_addr_(remote_addr),
00175       remote_port_(remote_port)
00176 {
00177     dtn_copy_eid(&dest_eid_, dest_eid);
00178 }
00179 
00180 //----------------------------------------------------------------------
00181 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00182                                   int fd,
00183                                   in_addr_t client_addr, u_int16_t client_port,
00184                                   in_addr_t remote_addr, u_int16_t remote_port)
00185     : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00186       Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00187       tcptun_(t),
00188       sock_(fd, client_addr, client_port, logpath_),
00189       queue_(logpath_),
00190       next_seqno_(0),
00191       client_addr_(client_addr),
00192       client_port_(client_port),
00193       remote_addr_(remote_addr),
00194       remote_port_(remote_port)
00195 {
00196     dtn_copy_eid(&dest_eid_, dest_eid);
00197 }
00198 
00199 //----------------------------------------------------------------------
00200 TCPTunnel::Connection::~Connection()
00201 {
00202     dtn::APIBundle* b;
00203     while(queue_.try_pop(&b)) {
00204         delete b;
00205     }
00206 }
00207 
00208 //----------------------------------------------------------------------
00209 void
00210 TCPTunnel::Connection::run()
00211 {
00212     DTNTunnel* tunnel = DTNTunnel::instance();
00213     u_int32_t send_seqno = 0;
00214     u_int32_t next_recv_seqno = 0;
00215     u_int32_t total_sent = 0;
00216     bool sock_eof = false;
00217     bool dtn_blocked = false;
00218 
00219     // outgoing (tcp -> dtn) / incoming (dtn -> tcp) bundles
00220     dtn::APIBundle* b_xmit = NULL;
00221     dtn::APIBundle* b_recv = NULL;
00222 
00223     // time values to implement nagle
00224     oasys::Time tbegin, tnow;
00225     ASSERT(tbegin.sec_ == 0);
00226     
00227     // header for outgoing bundles
00228     DTNTunnel::BundleHeader hdr;
00229     hdr.protocol_    = IPPROTO_TCP;
00230     hdr.seqno_       = 0;
00231     hdr.client_addr_ = client_addr_;
00232     hdr.client_port_ = htons(client_port_);
00233     hdr.remote_addr_ = remote_addr_;
00234     hdr.remote_port_ = htons(remote_port_);
00235     
00236     if (sock_.state() != oasys::IPSocket::ESTABLISHED) {
00237         int err = sock_.connect(remote_addr_, remote_port_);
00238         if (err != 0) {
00239             log_err("error connecting to %s:%d",
00240                     intoa(remote_addr_), remote_port_);
00241 
00242             // send an empty bundle back
00243             dtn::APIBundle* b = new dtn::APIBundle();
00244             hdr.eof_ = 1;
00245             memcpy(b->payload_.buf(sizeof(hdr)), &hdr, sizeof(hdr));
00246             b->payload_.set_len(sizeof(hdr));
00247             int err;
00248             if ((err = tunnel->send_bundle(b, &dest_eid_)) != DTN_SUCCESS) {
00249                 log_err("error sending connect reply bundle: %s",
00250                         dtn_strerror(err));
00251                 tcptun_->kill_connection(this);
00252                 exit(1);
00253             }
00254             goto done;
00255         }
00256     }
00257 
00258     while (1) {
00259         struct pollfd pollfds[2];
00260 
00261         struct pollfd* msg_poll  = &pollfds[0];
00262         msg_poll->fd             = queue_.read_fd();
00263         msg_poll->events         = POLLIN;
00264         msg_poll->revents        = 0;
00265 
00266         struct pollfd* sock_poll = &pollfds[1];
00267         sock_poll->fd            = sock_.fd();
00268         sock_poll->events        = POLLIN | POLLERR;
00269         sock_poll->revents       = 0;
00270 
00271         // if the socket already had an eof or if dtn is write
00272         // blocked, we just poll for activity on the message queue
00273         log_debug("blocking in poll...");
00274         int nfds = (sock_eof || dtn_blocked) ? 1 : 2;
00275 
00276         int timeout = -1;
00277         if (dtn_blocked) {
00278             timeout = 1000; // one second between retries
00279         } else if (tbegin.sec_ != 0) {
00280             timeout = tunnel->delay();
00281         }
00282         
00283         int nready = oasys::IO::poll_multiple(pollfds, nfds, timeout,
00284                                               NULL, logpath_);
00285         if (nready == oasys::IOERROR) {
00286             log_err("unexpected error in poll: %s", strerror(errno));
00287             goto done;
00288         }
00289 
00290         // check first for activity on the socket
00291         if (sock_poll->revents != 0) {
00292             if (b_xmit == NULL) {
00293                 b_xmit = new dtn::APIBundle();
00294                 b_xmit->payload_.reserve(tunnel->max_size());
00295                 hdr.seqno_ = ntohl(send_seqno++);
00296                 memcpy(b_xmit->payload_.buf(), &hdr, sizeof(hdr));
00297                 b_xmit->payload_.set_len(sizeof(hdr));
00298             }
00299 
00300             u_int payload_todo = tunnel->max_size() - b_xmit->payload_.len();
00301 
00302             if (payload_todo != 0) {
00303                 tbegin.get_time();
00304                 
00305                 char* bp = b_xmit->payload_.end();
00306                 int ret = sock_.read(bp, payload_todo);
00307                 if (ret < 0) {
00308                     log_err("error reading from socket: %s", strerror(errno));
00309                     delete b_xmit;
00310                     goto done;
00311                 }
00312                 
00313                 b_xmit->payload_.set_len(b_xmit->payload_.len() + ret);
00314                 
00315                 if (ret == 0) {
00316                     DTNTunnel::BundleHeader* hdrp =
00317                         (DTNTunnel::BundleHeader*)b_xmit->payload_.buf();
00318                     hdrp->eof_ = 1;
00319                     sock_eof = true;
00320                 }
00321             }
00322         }
00323 
00324         // now check if we should send the outgoing bundle
00325         tnow.get_time();
00326         if ((b_xmit != NULL) &&
00327             ((sock_eof == true) ||
00328              (b_xmit->payload_.len() == tunnel->max_size()) ||
00329              ((tnow - tbegin).in_milliseconds() >= tunnel->delay())))
00330         {
00331             size_t len = b_xmit->payload_.len();
00332             int err = tunnel->send_bundle(b_xmit, &dest_eid_);
00333             if (err == DTN_SUCCESS) {
00334                 total_sent += len;
00335                 log_info("sent %zu byte payload #%u to dtn (%u total)",
00336                          len, send_seqno, total_sent);
00337                 b_xmit = NULL;
00338                 tbegin.sec_ = 0;
00339                 tbegin.usec_ = 0;
00340                 dtn_blocked = false;
00341                 
00342             } else if (err == DTN_ENOSPACE) {
00343                 log_info("no space for %zu byte payload... "
00344                          "setting dtn_blocked", len);
00345                 dtn_blocked = true;
00346                 continue;
00347             } else {
00348                 log_err("error sending bundle: %s", dtn_strerror(err));
00349                 exit(1);
00350             }
00351         }
00352         
00353         // now check for activity on the incoming bundle queue
00354         if (msg_poll->revents != 0) {
00355             b_recv = queue_.pop_blocking();
00356             ASSERT(b_recv);
00357 
00358             DTNTunnel::BundleHeader* recv_hdr =
00359                 (DTNTunnel::BundleHeader*)b_recv->payload_.buf();
00360 
00361             u_int32_t recv_seqno = ntohl(recv_hdr->seqno_);
00362 
00363             // check the seqno match -- reordering should have been
00364             // handled before the bundle was put on the blocking
00365             // message queue
00366             if (recv_seqno != next_recv_seqno) {
00367                 log_err("got out of order bundle: seqno %d, expected %d",
00368                         recv_seqno, next_recv_seqno);
00369                 delete b_recv;
00370                 goto done;
00371             }
00372             ++next_recv_seqno;
00373 
00374             u_int len = b_recv->payload_.len() - sizeof(hdr);
00375 
00376             if (len == 0) {
00377                 log_info("got zero byte payload... closing connection");
00378                 sock_.close();
00379                 delete b_recv;
00380                 goto done;
00381             }
00382             
00383             int cc = sock_.writeall(b_recv->payload_.buf() + sizeof(hdr), len);
00384             if (cc != (int)len) {
00385                 log_err("error writing payload to socket: %s", strerror(errno));
00386                 delete b_recv;
00387                 goto done;
00388             }
00389             
00390             log_info("sent %d byte payload to client", len);
00391 
00392             if (recv_hdr->eof_) {
00393                 log_info("bundle had eof bit set... closing connection");
00394                 sock_.close();
00395             }
00396             
00397             delete b_recv;
00398         }
00399     }
00400 
00401  done:
00402     tcptun_->kill_connection(this);
00403 }
00404 
00405 //----------------------------------------------------------------------
00406 void
00407 TCPTunnel::Connection::handle_bundle(dtn::APIBundle* bundle)
00408 {
00409     DTNTunnel::BundleHeader* hdr =
00410         (DTNTunnel::BundleHeader*)bundle->payload_.buf();
00411     
00412     u_int32_t recv_seqno = ntohl(hdr->seqno_);
00413     
00414     // if it's out of order, stick it in the reorder table and return
00415     if (recv_seqno != next_seqno_)
00416     {
00417         log_info("got out of order bundle: expected seqno %d, got %d",
00418                  next_seqno_, recv_seqno);
00419         
00420         reorder_table_[recv_seqno] = bundle;
00421         return;
00422     }
00423 
00424     // deliver the one that just arrived
00425     log_info("delivering %zu byte bundle with seqno %d",
00426              bundle->payload_.len(), recv_seqno);
00427     queue_.push_back(bundle);
00428     next_seqno_++;
00429     
00430     // once we get one that's in order, that might let us transfer
00431     // more bundles out of the reorder table and into the queue
00432     ReorderTable::iterator iter;
00433     while (1) {
00434         iter = reorder_table_.find(next_seqno_);
00435         if (iter == reorder_table_.end()) {
00436             break;
00437         }
00438 
00439         bundle = iter->second;
00440         log_info("delivering %zu byte bundle with seqno %d (from reorder table)",
00441                  bundle->payload_.len(), next_seqno_);
00442         
00443         reorder_table_.erase(iter);
00444         next_seqno_++;
00445         
00446         queue_.push_back(bundle);
00447     }
00448 }
00449 
00450 } // namespace dtntunnel
00451 

Generated on Sat Sep 8 08:36:18 2007 for DTN Reference Implementation by  doxygen 1.5.3