00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <oasys/io/NetUtils.h>
00019 #include <oasys/util/Time.h>
00020 #include "DTNTunnel.h"
00021 #include "TCPTunnel.h"
00022
00023 namespace dtntunnel {
00024
00025
00026 TCPTunnel::TCPTunnel()
00027 : IPTunnel("TCPTunnel", "/dtntunnel/tcp")
00028 {
00029 }
00030
00031
00032 void
00033 TCPTunnel::add_listener(in_addr_t listen_addr, u_int16_t listen_port,
00034 in_addr_t remote_addr, u_int16_t remote_port)
00035 {
00036 new Listener(this, listen_addr, listen_port,
00037 remote_addr, remote_port);
00038 }
00039
00040
00041 void
00042 TCPTunnel::new_connection(Connection* c)
00043 {
00044 oasys::ScopeLock l(&lock_, "TCPTunnel::new_connection");
00045
00046 ConnTable::iterator i;
00047 ConnKey key(c->client_addr_,
00048 c->client_port_,
00049 c->remote_addr_,
00050 c->remote_port_);
00051
00052 i = connections_.find(key);
00053
00054 if (i != connections_.end()) {
00055 log_err("got duplicate connection %s:%d -> %s:%d",
00056 intoa(c->client_addr_),
00057 c->client_port_,
00058 intoa(c->remote_addr_),
00059 c->remote_port_);
00060 return;
00061 }
00062
00063 connections_.insert(ConnTable::value_type(key, c));
00064 }
00065
00066
00067 void
00068 TCPTunnel::kill_connection(Connection* c)
00069 {
00070 oasys::ScopeLock l(&lock_, "TCPTunnel::kill_connection");
00071
00072 ConnTable::iterator i;
00073 ConnKey key(c->client_addr_,
00074 c->client_port_,
00075 c->remote_addr_,
00076 c->remote_port_);
00077
00078 i = connections_.find(key);
00079
00080 if (i == connections_.end()) {
00081 log_err("can't find connection to kill %s:%d -> %s:%d",
00082 intoa(c->client_addr_),
00083 c->client_port_,
00084 intoa(c->remote_addr_),
00085 c->remote_port_);
00086 return;
00087 }
00088
00089 connections_.erase(i);
00090 }
00091
00092
00093 void
00094 TCPTunnel::handle_bundle(dtn::APIBundle* bundle)
00095 {
00096 oasys::ScopeLock l(&lock_, "TCPTunnel::handle_bundle");
00097
00098 log_debug("handle_bundle got %zu byte bundle", bundle->payload_.len());
00099
00100 DTNTunnel::BundleHeader hdr;
00101 memcpy(&hdr, bundle->payload_.buf(), sizeof(hdr));
00102 hdr.client_port_ = htons(hdr.client_port_);
00103 hdr.remote_port_ = htons(hdr.remote_port_);
00104
00105 Connection* conn;
00106 ConnTable::iterator i;
00107 ConnKey key(hdr.client_addr_,
00108 hdr.client_port_,
00109 hdr.remote_addr_,
00110 hdr.remote_port_);
00111
00112 i = connections_.find(key);
00113
00114 if (i == connections_.end()) {
00115 log_info("new connection %s:%d -> %s:%d",
00116 intoa(hdr.client_addr_),
00117 hdr.client_port_,
00118 intoa(hdr.remote_addr_),
00119 hdr.remote_port_);
00120
00121 conn = new Connection(this, &bundle->spec_.source,
00122 hdr.client_addr_, hdr.client_port_,
00123 hdr.remote_addr_, hdr.remote_port_);
00124 conn->start();
00125 connections_.insert(ConnTable::value_type(key, conn));
00126
00127 } else {
00128 conn = i->second;
00129 ASSERT(conn != NULL);
00130 }
00131
00132 conn->handle_bundle(bundle);
00133 return;
00134 }
00135
00136
00137 TCPTunnel::Listener::Listener(TCPTunnel* t,
00138 in_addr_t listen_addr, u_int16_t listen_port,
00139 in_addr_t remote_addr, u_int16_t remote_port)
00140 : TCPServerThread("TCPTunnel::Listener",
00141 "/dtntunnel/tcp/listener",
00142 Thread::DELETE_ON_EXIT),
00143 tcptun_(t),
00144 listen_addr_(listen_addr),
00145 listen_port_(listen_port),
00146 remote_addr_(remote_addr),
00147 remote_port_(remote_port)
00148 {
00149 bind_listen_start(listen_addr, listen_port);
00150 }
00151
00152
00153 void
00154 TCPTunnel::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00155 {
00156 Connection* c = new Connection(tcptun_, DTNTunnel::instance()->dest_eid(),
00157 fd, addr, port, remote_addr_, remote_port_);
00158 tcptun_->new_connection(c);
00159 c->start();
00160 }
00161
00162
00163 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00164 in_addr_t client_addr, u_int16_t client_port,
00165 in_addr_t remote_addr, u_int16_t remote_port)
00166 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00167 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00168 tcptun_(t),
00169 sock_(logpath_),
00170 queue_(logpath_),
00171 next_seqno_(0),
00172 client_addr_(client_addr),
00173 client_port_(client_port),
00174 remote_addr_(remote_addr),
00175 remote_port_(remote_port)
00176 {
00177 dtn_copy_eid(&dest_eid_, dest_eid);
00178 }
00179
00180
00181 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00182 int fd,
00183 in_addr_t client_addr, u_int16_t client_port,
00184 in_addr_t remote_addr, u_int16_t remote_port)
00185 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00186 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00187 tcptun_(t),
00188 sock_(fd, client_addr, client_port, logpath_),
00189 queue_(logpath_),
00190 next_seqno_(0),
00191 client_addr_(client_addr),
00192 client_port_(client_port),
00193 remote_addr_(remote_addr),
00194 remote_port_(remote_port)
00195 {
00196 dtn_copy_eid(&dest_eid_, dest_eid);
00197 }
00198
00199
00200 TCPTunnel::Connection::~Connection()
00201 {
00202 dtn::APIBundle* b;
00203 while(queue_.try_pop(&b)) {
00204 delete b;
00205 }
00206 }
00207
00208
00209 void
00210 TCPTunnel::Connection::run()
00211 {
00212 DTNTunnel* tunnel = DTNTunnel::instance();
00213 u_int32_t send_seqno = 0;
00214 u_int32_t next_recv_seqno = 0;
00215 u_int32_t total_sent = 0;
00216 bool sock_eof = false;
00217 bool dtn_blocked = false;
00218
00219
00220 dtn::APIBundle* b_xmit = NULL;
00221 dtn::APIBundle* b_recv = NULL;
00222
00223
00224 oasys::Time tbegin, tnow;
00225 ASSERT(tbegin.sec_ == 0);
00226
00227
00228 DTNTunnel::BundleHeader hdr;
00229 hdr.protocol_ = IPPROTO_TCP;
00230 hdr.seqno_ = 0;
00231 hdr.client_addr_ = client_addr_;
00232 hdr.client_port_ = htons(client_port_);
00233 hdr.remote_addr_ = remote_addr_;
00234 hdr.remote_port_ = htons(remote_port_);
00235
00236 if (sock_.state() != oasys::IPSocket::ESTABLISHED) {
00237 int err = sock_.connect(remote_addr_, remote_port_);
00238 if (err != 0) {
00239 log_err("error connecting to %s:%d",
00240 intoa(remote_addr_), remote_port_);
00241
00242
00243 dtn::APIBundle* b = new dtn::APIBundle();
00244 hdr.eof_ = 1;
00245 memcpy(b->payload_.buf(sizeof(hdr)), &hdr, sizeof(hdr));
00246 b->payload_.set_len(sizeof(hdr));
00247 int err;
00248 if ((err = tunnel->send_bundle(b, &dest_eid_)) != DTN_SUCCESS) {
00249 log_err("error sending connect reply bundle: %s",
00250 dtn_strerror(err));
00251 tcptun_->kill_connection(this);
00252 exit(1);
00253 }
00254 goto done;
00255 }
00256 }
00257
00258 while (1) {
00259 struct pollfd pollfds[2];
00260
00261 struct pollfd* msg_poll = &pollfds[0];
00262 msg_poll->fd = queue_.read_fd();
00263 msg_poll->events = POLLIN;
00264 msg_poll->revents = 0;
00265
00266 struct pollfd* sock_poll = &pollfds[1];
00267 sock_poll->fd = sock_.fd();
00268 sock_poll->events = POLLIN | POLLERR;
00269 sock_poll->revents = 0;
00270
00271
00272
00273 log_debug("blocking in poll...");
00274 int nfds = (sock_eof || dtn_blocked) ? 1 : 2;
00275
00276 int timeout = -1;
00277 if (dtn_blocked) {
00278 timeout = 1000;
00279 } else if (tbegin.sec_ != 0) {
00280 timeout = tunnel->delay();
00281 }
00282
00283 int nready = oasys::IO::poll_multiple(pollfds, nfds, timeout,
00284 NULL, logpath_);
00285 if (nready == oasys::IOERROR) {
00286 log_err("unexpected error in poll: %s", strerror(errno));
00287 goto done;
00288 }
00289
00290
00291 if (sock_poll->revents != 0) {
00292 if (b_xmit == NULL) {
00293 b_xmit = new dtn::APIBundle();
00294 b_xmit->payload_.reserve(tunnel->max_size());
00295 hdr.seqno_ = ntohl(send_seqno++);
00296 memcpy(b_xmit->payload_.buf(), &hdr, sizeof(hdr));
00297 b_xmit->payload_.set_len(sizeof(hdr));
00298 }
00299
00300 u_int payload_todo = tunnel->max_size() - b_xmit->payload_.len();
00301
00302 if (payload_todo != 0) {
00303 tbegin.get_time();
00304
00305 char* bp = b_xmit->payload_.end();
00306 int ret = sock_.read(bp, payload_todo);
00307 if (ret < 0) {
00308 log_err("error reading from socket: %s", strerror(errno));
00309 delete b_xmit;
00310 goto done;
00311 }
00312
00313 b_xmit->payload_.set_len(b_xmit->payload_.len() + ret);
00314
00315 if (ret == 0) {
00316 DTNTunnel::BundleHeader* hdrp =
00317 (DTNTunnel::BundleHeader*)b_xmit->payload_.buf();
00318 hdrp->eof_ = 1;
00319 sock_eof = true;
00320 }
00321 }
00322 }
00323
00324
00325 tnow.get_time();
00326 if ((b_xmit != NULL) &&
00327 ((sock_eof == true) ||
00328 (b_xmit->payload_.len() == tunnel->max_size()) ||
00329 ((tnow - tbegin).in_milliseconds() >= tunnel->delay())))
00330 {
00331 size_t len = b_xmit->payload_.len();
00332 int err = tunnel->send_bundle(b_xmit, &dest_eid_);
00333 if (err == DTN_SUCCESS) {
00334 total_sent += len;
00335 log_info("sent %zu byte payload #%u to dtn (%u total)",
00336 len, send_seqno, total_sent);
00337 b_xmit = NULL;
00338 tbegin.sec_ = 0;
00339 tbegin.usec_ = 0;
00340 dtn_blocked = false;
00341
00342 } else if (err == DTN_ENOSPACE) {
00343 log_info("no space for %zu byte payload... "
00344 "setting dtn_blocked", len);
00345 dtn_blocked = true;
00346 continue;
00347 } else {
00348 log_err("error sending bundle: %s", dtn_strerror(err));
00349 exit(1);
00350 }
00351 }
00352
00353
00354 if (msg_poll->revents != 0) {
00355 b_recv = queue_.pop_blocking();
00356 ASSERT(b_recv);
00357
00358 DTNTunnel::BundleHeader* recv_hdr =
00359 (DTNTunnel::BundleHeader*)b_recv->payload_.buf();
00360
00361 u_int32_t recv_seqno = ntohl(recv_hdr->seqno_);
00362
00363
00364
00365
00366 if (recv_seqno != next_recv_seqno) {
00367 log_err("got out of order bundle: seqno %d, expected %d",
00368 recv_seqno, next_recv_seqno);
00369 delete b_recv;
00370 goto done;
00371 }
00372 ++next_recv_seqno;
00373
00374 u_int len = b_recv->payload_.len() - sizeof(hdr);
00375
00376 if (len == 0) {
00377 log_info("got zero byte payload... closing connection");
00378 sock_.close();
00379 delete b_recv;
00380 goto done;
00381 }
00382
00383 int cc = sock_.writeall(b_recv->payload_.buf() + sizeof(hdr), len);
00384 if (cc != (int)len) {
00385 log_err("error writing payload to socket: %s", strerror(errno));
00386 delete b_recv;
00387 goto done;
00388 }
00389
00390 log_info("sent %d byte payload to client", len);
00391
00392 if (recv_hdr->eof_) {
00393 log_info("bundle had eof bit set... closing connection");
00394 sock_.close();
00395 }
00396
00397 delete b_recv;
00398 }
00399 }
00400
00401 done:
00402 tcptun_->kill_connection(this);
00403 }
00404
00405
00406 void
00407 TCPTunnel::Connection::handle_bundle(dtn::APIBundle* bundle)
00408 {
00409 DTNTunnel::BundleHeader* hdr =
00410 (DTNTunnel::BundleHeader*)bundle->payload_.buf();
00411
00412 u_int32_t recv_seqno = ntohl(hdr->seqno_);
00413
00414
00415 if (recv_seqno != next_seqno_)
00416 {
00417 log_info("got out of order bundle: expected seqno %d, got %d",
00418 next_seqno_, recv_seqno);
00419
00420 reorder_table_[recv_seqno] = bundle;
00421 return;
00422 }
00423
00424
00425 log_info("delivering %zu byte bundle with seqno %d",
00426 bundle->payload_.len(), recv_seqno);
00427 queue_.push_back(bundle);
00428 next_seqno_++;
00429
00430
00431
00432 ReorderTable::iterator iter;
00433 while (1) {
00434 iter = reorder_table_.find(next_seqno_);
00435 if (iter == reorder_table_.end()) {
00436 break;
00437 }
00438
00439 bundle = iter->second;
00440 log_info("delivering %zu byte bundle with seqno %d (from reorder table)",
00441 bundle->payload_.len(), next_seqno_);
00442
00443 reorder_table_.erase(iter);
00444 next_seqno_++;
00445
00446 queue_.push_back(bundle);
00447 }
00448 }
00449
00450 }
00451