00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <oasys/io/NetUtils.h>
00040 #include "DTNTunnel.h"
00041 #include "TCPTunnel.h"
00042
00043 namespace dtntunnel {
00044
00045
00046 TCPTunnel::TCPTunnel()
00047 : IPTunnel("TCPTunnel", "/dtntunnel/tcp")
00048 {
00049 }
00050
00051
00052 void
00053 TCPTunnel::add_listener(in_addr_t listen_addr, u_int16_t listen_port,
00054 in_addr_t remote_addr, u_int16_t remote_port)
00055 {
00056 new Listener(this, listen_addr, listen_port,
00057 remote_addr, remote_port);
00058 }
00059
00060
00061 void
00062 TCPTunnel::new_connection(Connection* c)
00063 {
00064 oasys::ScopeLock l(&lock_, "TCPTunnel::new_connection");
00065
00066 ConnTable::iterator i;
00067 ConnKey key(c->client_addr_,
00068 c->client_port_,
00069 c->remote_addr_,
00070 c->remote_port_);
00071
00072 i = connections_.find(key);
00073
00074 if (i != connections_.end()) {
00075 log_err("got duplicate connection %s:%d -> %s:%d",
00076 intoa(c->client_addr_),
00077 c->client_port_,
00078 intoa(c->remote_addr_),
00079 c->remote_port_);
00080 return;
00081 }
00082
00083 connections_.insert(ConnTable::value_type(key, c));
00084 }
00085
00086
00087 void
00088 TCPTunnel::kill_connection(Connection* c)
00089 {
00090 oasys::ScopeLock l(&lock_, "TCPTunnel::kill_connection");
00091
00092 ConnTable::iterator i;
00093 ConnKey key(c->client_addr_,
00094 c->client_port_,
00095 c->remote_addr_,
00096 c->remote_port_);
00097
00098 i = connections_.find(key);
00099
00100 if (i == connections_.end()) {
00101 log_err("can't find connection to kill %s:%d -> %s:%d",
00102 intoa(c->client_addr_),
00103 c->client_port_,
00104 intoa(c->remote_addr_),
00105 c->remote_port_);
00106 return;
00107 }
00108
00109 connections_.erase(i);
00110 }
00111
00112
00113 void
00114 TCPTunnel::handle_bundle(dtn::APIBundle* bundle)
00115 {
00116 oasys::ScopeLock l(&lock_, "TCPTunnel::handle_bundle");
00117
00118 log_debug("handle_bundle got %zu byte bundle", bundle->payload_.len());
00119
00120 DTNTunnel::BundleHeader hdr;
00121 memcpy(&hdr, bundle->payload_.buf(), sizeof(hdr));
00122 hdr.client_port_ = htons(hdr.client_port_);
00123 hdr.remote_port_ = htons(hdr.remote_port_);
00124
00125 Connection* conn;
00126 ConnTable::iterator i;
00127 ConnKey key(hdr.client_addr_,
00128 hdr.client_port_,
00129 hdr.remote_addr_,
00130 hdr.remote_port_);
00131
00132 i = connections_.find(key);
00133
00134 if (i == connections_.end()) {
00135 log_info("new connection %s:%d -> %s:%d",
00136 intoa(hdr.client_addr_),
00137 hdr.client_port_,
00138 intoa(hdr.remote_addr_),
00139 hdr.remote_port_);
00140
00141 conn = new Connection(this, &bundle->spec_.source,
00142 hdr.client_addr_, hdr.client_port_,
00143 hdr.remote_addr_, hdr.remote_port_);
00144 conn->start();
00145 connections_.insert(ConnTable::value_type(key, conn));
00146
00147 } else {
00148 conn = i->second;
00149 ASSERT(conn != NULL);
00150 }
00151
00152 conn->handle_bundle(bundle);
00153 return;
00154 }
00155
00156
00157 TCPTunnel::Listener::Listener(TCPTunnel* t,
00158 in_addr_t listen_addr, u_int16_t listen_port,
00159 in_addr_t remote_addr, u_int16_t remote_port)
00160 : TCPServerThread("TCPTunnel::Listener",
00161 "/dtntunnel/tcp/listener",
00162 Thread::DELETE_ON_EXIT),
00163 tcptun_(t),
00164 listen_addr_(listen_addr),
00165 listen_port_(listen_port),
00166 remote_addr_(remote_addr),
00167 remote_port_(remote_port)
00168 {
00169 bind_listen_start(listen_addr, listen_port);
00170 }
00171
00172
00173 void
00174 TCPTunnel::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00175 {
00176 Connection* c = new Connection(tcptun_, DTNTunnel::instance()->dest_eid(),
00177 fd, addr, port, remote_addr_, remote_port_);
00178 tcptun_->new_connection(c);
00179 c->start();
00180 }
00181
00182
00183 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00184 in_addr_t client_addr, u_int16_t client_port,
00185 in_addr_t remote_addr, u_int16_t remote_port)
00186 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00187 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00188 tcptun_(t),
00189 sock_(logpath_),
00190 queue_(logpath_),
00191 next_seqno_(0),
00192 client_addr_(client_addr),
00193 client_port_(client_port),
00194 remote_addr_(remote_addr),
00195 remote_port_(remote_port)
00196 {
00197 dtn_copy_eid(&dest_eid_, dest_eid);
00198 }
00199
00200
00201 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
00202 int fd,
00203 in_addr_t client_addr, u_int16_t client_port,
00204 in_addr_t remote_addr, u_int16_t remote_port)
00205 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
00206 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
00207 tcptun_(t),
00208 sock_(fd, client_addr, client_port, logpath_),
00209 queue_(logpath_),
00210 next_seqno_(0),
00211 client_addr_(client_addr),
00212 client_port_(client_port),
00213 remote_addr_(remote_addr),
00214 remote_port_(remote_port)
00215 {
00216 dtn_copy_eid(&dest_eid_, dest_eid);
00217 }
00218
00219
00220 TCPTunnel::Connection::~Connection()
00221 {
00222 dtn::APIBundle* b;
00223 while(queue_.try_pop(&b)) {
00224 delete b;
00225 }
00226 }
00227
00228
00229 void
00230 TCPTunnel::Connection::run()
00231 {
00232 DTNTunnel* tunnel = DTNTunnel::instance();
00233 u_int32_t send_seqno = 0;
00234 u_int32_t next_recv_seqno = 0;
00235 bool sock_eof = false;
00236
00237
00238 DTNTunnel::BundleHeader hdr;
00239 hdr.protocol_ = IPPROTO_TCP;
00240 hdr.seqno_ = 0;
00241 hdr.client_addr_ = client_addr_;
00242 hdr.client_port_ = htons(client_port_);
00243 hdr.remote_addr_ = remote_addr_;
00244 hdr.remote_port_ = htons(remote_port_);
00245
00246 if (sock_.state() != oasys::IPSocket::ESTABLISHED) {
00247 int err = sock_.connect(remote_addr_, remote_port_);
00248 if (err != 0) {
00249 log_err("error connecting to %s:%d",
00250 intoa(remote_addr_), remote_port_);
00251
00252
00253 dtn::APIBundle* b = new dtn::APIBundle();
00254 memcpy(b->payload_.buf(sizeof(hdr)), &hdr, sizeof(hdr));
00255 b->payload_.set_len(sizeof(hdr));
00256 if (tunnel->send_bundle(b, &dest_eid_) != DTN_SUCCESS) {
00257 tcptun_->kill_connection(this);
00258 exit(1);
00259 }
00260 goto done;
00261 }
00262 }
00263
00264 while (1) {
00265 struct pollfd pollfds[2];
00266
00267 struct pollfd* msg_poll = &pollfds[0];
00268 msg_poll->fd = queue_.read_fd();
00269 msg_poll->events = POLLIN;
00270 msg_poll->revents = 0;
00271
00272 struct pollfd* sock_poll = &pollfds[1];
00273 sock_poll->fd = sock_.fd();
00274 sock_poll->events = POLLIN | POLLERR;
00275 sock_poll->revents = 0;
00276
00277
00278
00279 log_debug("blocking in poll...");
00280 int nfds = sock_eof ? 1 : 2;
00281 int nready = oasys::IO::poll_multiple(pollfds, nfds, -1, NULL, logpath_);
00282 if (nready <= 0) {
00283 log_err("unexpected error in poll: %s", strerror(errno));
00284 goto done;
00285 }
00286
00287
00288 if (sock_poll->revents != 0) {
00289 dtn::APIBundle* b = new dtn::APIBundle();
00290
00291 u_int payload_len = tunnel->max_size();
00292 char* bp = b->payload_.buf(sizeof(hdr) + payload_len);
00293 int ret = sock_.read(bp + sizeof(hdr), payload_len);
00294 if (ret < 0) {
00295 log_err("error reading from socket: %s", strerror(errno));
00296 delete b;
00297 goto done;
00298 }
00299
00300 hdr.seqno_ = ntohl(send_seqno++);
00301 b->payload_.set_len(sizeof(hdr) + ret);
00302 memcpy(b->payload_.buf(), &hdr, sizeof(hdr));
00303 if (tunnel->send_bundle(b, &dest_eid_) != DTN_SUCCESS)
00304 exit(1);
00305 else
00306 log_info("sent %d byte payload to dtn", ret);
00307
00308 if (ret == 0) {
00309 sock_eof = true;
00310 }
00311 }
00312
00313
00314 if (msg_poll->revents != 0) {
00315 dtn::APIBundle* b = queue_.pop_blocking();
00316 ASSERT(b);
00317
00318 DTNTunnel::BundleHeader* recv_hdr =
00319 (DTNTunnel::BundleHeader*)b->payload_.buf();
00320
00321 u_int32_t recv_seqno = ntohl(recv_hdr->seqno_);
00322
00323
00324
00325
00326 if (recv_seqno != next_recv_seqno) {
00327 log_err("got out of order bundle: seqno %d, expected %d",
00328 recv_seqno, next_recv_seqno);
00329 delete b;
00330 goto done;
00331 }
00332 ++next_recv_seqno;
00333
00334 u_int len = b->payload_.len() - sizeof(hdr);
00335
00336 if (len == 0) {
00337 log_info("got zero byte payload... closing connection");
00338 sock_.close();
00339 delete b;
00340 goto done;
00341 }
00342
00343 int cc = sock_.writeall(b->payload_.buf() + sizeof(hdr), len);
00344 if (cc != (int)len) {
00345 log_err("error writing payload to socket: %s", strerror(errno));
00346 delete b;
00347 goto done;
00348 }
00349
00350 log_info("sent %d byte payload to client", len);
00351
00352 delete b;
00353 }
00354 }
00355
00356 done:
00357 tcptun_->kill_connection(this);
00358 }
00359
00360
00361 void
00362 TCPTunnel::Connection::handle_bundle(dtn::APIBundle* bundle)
00363 {
00364 DTNTunnel::BundleHeader* hdr =
00365 (DTNTunnel::BundleHeader*)bundle->payload_.buf();
00366
00367 u_int32_t recv_seqno = ntohl(hdr->seqno_);
00368
00369
00370 if (recv_seqno != next_seqno_)
00371 {
00372 log_info("got out of order bundle: expected seqno %d, got %d",
00373 next_seqno_, recv_seqno);
00374
00375 reorder_table_[recv_seqno] = bundle;
00376 return;
00377 }
00378
00379
00380 log_info("delivering %zu byte bundle with seqno %d",
00381 bundle->payload_.len(), recv_seqno);
00382 queue_.push_back(bundle);
00383 next_seqno_++;
00384
00385
00386
00387 ReorderTable::iterator iter;
00388 while (1) {
00389 iter = reorder_table_.find(next_seqno_);
00390 if (iter == reorder_table_.end()) {
00391 break;
00392 }
00393
00394 bundle = iter->second;
00395 log_info("delivering %zu byte bundle with seqno %d (from reorder table)",
00396 bundle->payload_.len(), next_seqno_);
00397
00398 reorder_table_.erase(iter);
00399 next_seqno_++;
00400
00401 queue_.push_back(bundle);
00402 }
00403 }
00404
00405 }
00406