00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <sys/poll.h>
00019 #include <stdlib.h>
00020
00021 #include <oasys/io/NetUtils.h>
00022 #include <oasys/util/OptParser.h>
00023
00024 #include "TCPConvergenceLayer.h"
00025 #include "IPConvergenceLayerUtils.h"
00026 #include "bundling/BundleDaemon.h"
00027 #include "contacts/ContactManager.h"
00028
00029 namespace dtn {
00030
00031 TCPConvergenceLayer::TCPLinkParams
00032 TCPConvergenceLayer::default_link_params_(true);
00033
00034
00035 TCPConvergenceLayer::TCPLinkParams::TCPLinkParams(bool init_defaults)
00036 : StreamLinkParams(init_defaults),
00037 local_addr_(INADDR_ANY),
00038 remote_addr_(INADDR_NONE),
00039 remote_port_(TCPCL_DEFAULT_PORT)
00040 {
00041 }
00042
00043
00044 TCPConvergenceLayer::TCPConvergenceLayer()
00045 : StreamConvergenceLayer("TCPConvergenceLayer", "tcp", TCPCL_VERSION)
00046 {
00047 }
00048
00049
00050 ConnectionConvergenceLayer::LinkParams*
00051 TCPConvergenceLayer::new_link_params()
00052 {
00053 return new TCPLinkParams(default_link_params_);
00054 }
00055
00056
00057 bool
00058 TCPConvergenceLayer::parse_link_params(LinkParams* lparams,
00059 int argc, const char** argv,
00060 const char** invalidp)
00061 {
00062 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00063 ASSERT(params != NULL);
00064
00065 oasys::OptParser p;
00066
00067 p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_));
00068
00069 int count = p.parse_and_shift(argc, argv, invalidp);
00070 if (count == -1) {
00071 return false;
00072 }
00073 argc -= count;
00074
00075 if (params->local_addr_ == INADDR_NONE) {
00076 log_err("invalid local address setting of INADDR_NONE");
00077 return false;
00078 }
00079
00080
00081 return StreamConvergenceLayer::parse_link_params(lparams, argc, argv,
00082 invalidp);
00083 }
00084
00085
00086 void
00087 TCPConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00088 {
00089 StreamConvergenceLayer::dump_link(link, buf);
00090
00091 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(link->cl_info());
00092 ASSERT(params != NULL);
00093
00094 buf->appendf("local_addr: %s\n", intoa(params->local_addr_));
00095 buf->appendf("remote_addr: %s\n", intoa(params->remote_addr_));
00096 buf->appendf("remote_port: %d\n", params->remote_port_);
00097 }
00098
00099
00100 bool
00101 TCPConvergenceLayer::set_link_defaults(int argc, const char* argv[],
00102 const char** invalidp)
00103 {
00104 return parse_link_params(&default_link_params_, argc, argv, invalidp);
00105 }
00106
00107
00108 bool
00109 TCPConvergenceLayer::parse_nexthop(Link* link, LinkParams* lparams)
00110 {
00111 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00112 ASSERT(params != NULL);
00113
00114 if (params->remote_addr_ == INADDR_NONE || params->remote_port_ == 0)
00115 {
00116 if (! IPConvergenceLayerUtils::parse_nexthop(logpath_, link->nexthop(),
00117 ¶ms->remote_addr_,
00118 ¶ms->remote_port_)) {
00119 return false;
00120 }
00121 }
00122
00123 if (params->remote_addr_ == INADDR_ANY ||
00124 params->remote_addr_ == INADDR_NONE)
00125 {
00126 log_warn("can't lookup hostname in next hop address '%s'",
00127 link->nexthop());
00128 return false;
00129 }
00130
00131
00132 if (params->remote_port_ == 0) {
00133 log_err("port not specified in next hop address '%s'",
00134 link->nexthop());
00135 return false;
00136 }
00137
00138 return true;
00139 }
00140
00141
00142 CLConnection*
00143 TCPConvergenceLayer::new_connection(LinkParams* p)
00144 {
00145 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(p);
00146 ASSERT(params != NULL);
00147 return new Connection(this, params);
00148 }
00149
00150
00151 bool
00152 TCPConvergenceLayer::interface_up(Interface* iface,
00153 int argc, const char* argv[])
00154 {
00155 log_debug("adding interface %s", iface->name().c_str());
00156 in_addr_t local_addr = INADDR_ANY;
00157 u_int16_t local_port = TCPCL_DEFAULT_PORT;
00158
00159 oasys::OptParser p;
00160 p.addopt(new oasys::InAddrOpt("local_addr", &local_addr));
00161 p.addopt(new oasys::UInt16Opt("local_port", &local_port));
00162
00163 const char* invalid = NULL;
00164 if (! p.parse(argc, argv, &invalid)) {
00165 log_err("error parsing interface options: invalid option '%s'",
00166 invalid);
00167 return false;
00168 }
00169
00170
00171 if (local_addr == INADDR_NONE) {
00172 log_err("invalid local address setting of INADDR_NONE");
00173 return false;
00174 }
00175
00176 if (local_port == 0) {
00177 log_err("invalid local port setting of 0");
00178 return false;
00179 }
00180
00181
00182 Listener* listener = new Listener(this);
00183 listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00184
00185 int ret = listener->bind(local_addr, local_port);
00186
00187
00188
00189 if (ret != 0 && errno == EADDRINUSE) {
00190 listener->logf(oasys::LOG_WARN,
00191 "WARNING: error binding to requested socket: %s",
00192 strerror(errno));
00193 listener->logf(oasys::LOG_WARN,
00194 "waiting for 10 seconds then trying again");
00195 sleep(10);
00196
00197 ret = listener->bind(local_addr, local_port);
00198 }
00199
00200 if (ret != 0) {
00201 return false;
00202 }
00203
00204
00205 listener->listen();
00206 listener->start();
00207
00208
00209
00210 iface->set_cl_info(listener);
00211
00212 return true;
00213 }
00214
00215
00216 bool
00217 TCPConvergenceLayer::interface_down(Interface* iface)
00218 {
00219
00220
00221
00222
00223 Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00224 ASSERT(listener != NULL);
00225
00226 listener->set_should_stop();
00227
00228 listener->interrupt_from_io();
00229
00230 while (! listener->is_stopped()) {
00231 oasys::Thread::yield();
00232 }
00233
00234 delete listener;
00235 return true;
00236 }
00237
00238
00239 void
00240 TCPConvergenceLayer::dump_interface(Interface* iface,
00241 oasys::StringBuffer* buf)
00242 {
00243 Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00244 ASSERT(listener != NULL);
00245
00246 buf->appendf("\tlocal_addr: %s local_port: %d\n",
00247 intoa(listener->local_addr()), listener->local_port());
00248 }
00249
00250
00251 TCPConvergenceLayer::Listener::Listener(TCPConvergenceLayer* cl)
00252 : IOHandlerBase(new oasys::Notifier("/dtn/cl/tcp/listener")),
00253 TCPServerThread("TCPConvergenceLayer::Listener",
00254 "/dtn/cl/tcp/listener"),
00255 cl_(cl)
00256 {
00257 logfd_ = false;
00258 }
00259
00260
00261 void
00262 TCPConvergenceLayer::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00263 {
00264 log_debug("new connection from %s:%d", intoa(addr), port);
00265
00266 Connection* conn =
00267 new Connection(cl_, &TCPConvergenceLayer::default_link_params_,
00268 fd, addr, port);
00269 conn->start();
00270 }
00271
00272
00273 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00274 TCPLinkParams* params)
00275 : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00276 cl->logpath(), cl, params,
00277 true )
00278 {
00279 logpathf("%s/conn/%p", cl->logpath(), this);
00280
00281
00282 oasys::StringBuffer nexthop("%s:%d",
00283 intoa(params->remote_addr_),
00284 params->remote_port_);
00285 set_nexthop(nexthop.c_str());
00286
00287
00288 sock_ = new oasys::TCPClient(logpath_);
00289
00290
00291
00292
00293
00294
00295 sock_->logpathf("%s/sock", logpath_);
00296 sock_->set_logfd(false);
00297
00298 sock_->init_socket();
00299 sock_->set_nonblocking(true);
00300
00301
00302
00303
00304 if (params->local_addr_ != INADDR_ANY)
00305 {
00306 if (sock_->bind(params->local_addr_, 0) != 0) {
00307 log_err("error binding to %s: %s",
00308 intoa(params->local_addr_),
00309 strerror(errno));
00310 }
00311 }
00312 }
00313
00314
00315 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00316 TCPLinkParams* params,
00317 int fd,
00318 in_addr_t remote_addr,
00319 u_int16_t remote_port)
00320 : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00321 cl->logpath(), cl, params,
00322 false )
00323 {
00324 logpathf("%s/conn/%p", cl->logpath(), this);
00325
00326
00327 oasys::StringBuffer nexthop("%s:%d", intoa(remote_addr), remote_port);
00328 set_nexthop(nexthop.c_str());
00329
00330 sock_ = new oasys::TCPClient(fd, remote_addr, remote_port, logpath_);
00331 sock_->set_logfd(false);
00332 sock_->set_nonblocking(true);
00333 }
00334
00335
00336 TCPConvergenceLayer::Connection::~Connection()
00337 {
00338 delete sock_;
00339 }
00340
00341
00342 void
00343 TCPConvergenceLayer::Connection::serialize(oasys::SerializeAction *a)
00344 {
00345 TCPLinkParams *params = tcp_lparams();
00346 if (! params) return;
00347
00348 oasys::Intoa local_addr = oasys::Intoa(params->local_addr_);
00349 const char * local_addr_str = local_addr.buf();
00350 oasys::Intoa remote_addr = oasys::Intoa(params->remote_addr_);
00351 const char * remote_addr_str = remote_addr.buf();
00352
00353 a->process("local_addr",
00354 (u_char *) local_addr_str, strlen(local_addr_str));
00355 a->process("remote_addr",
00356 (u_char *) remote_addr_str, strlen(remote_addr_str));
00357 a->process("remote_port", ¶ms->remote_port_);
00358
00359
00360 a->process("segment_ack_enabled", ¶ms->segment_ack_enabled_);
00361 a->process("negative_ack_enabled", ¶ms->negative_ack_enabled_);
00362 a->process("keepalive_interval", ¶ms->keepalive_interval_);
00363 a->process("segment_length", ¶ms->segment_length_);
00364
00365
00366 a->process("busy_queue_depth", ¶ms->busy_queue_depth_);
00367 a->process("reactive_frag_enabled", ¶ms->reactive_frag_enabled_);
00368 a->process("sendbuf_length", ¶ms->sendbuf_len_);
00369 a->process("recvbuf_length", ¶ms->recvbuf_len_);
00370 a->process("data_timeout", ¶ms->data_timeout_);
00371 }
00372
00373
00374 void
00375 TCPConvergenceLayer::Connection::initialize_pollfds()
00376 {
00377 sock_pollfd_ = &pollfds_[0];
00378 num_pollfds_ = 1;
00379
00380 sock_pollfd_->fd = sock_->fd();
00381 sock_pollfd_->events = POLLIN;
00382
00383 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00384 ASSERT(params != NULL);
00385
00386 poll_timeout_ = params->data_timeout_;
00387
00388 if (params->keepalive_interval_ != 0 &&
00389 (params->keepalive_interval_ * 1000) < params->data_timeout_)
00390 {
00391 poll_timeout_ = params->keepalive_interval_ * 1000;
00392 }
00393 }
00394
00395
00396 void
00397 TCPConvergenceLayer::Connection::connect()
00398 {
00399
00400
00401 if (! cl_->parse_nexthop(contact_->link(), params_)) {
00402 log_info("can't resolve nexthop address '%s'",
00403 contact_->link()->nexthop());
00404 break_contact(ContactEvent::BROKEN);
00405 return;
00406 }
00407
00408
00409 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00410 ASSERT(params != NULL);
00411 sock_->set_remote_addr(params->remote_addr_);
00412 sock_->set_remote_port(params->remote_port_);
00413
00414
00415
00416
00417 log_debug("connect: connecting to %s:%d...",
00418 intoa(sock_->remote_addr()), sock_->remote_port());
00419 ASSERT(contact_ == NULL || contact_->link()->isopening());
00420 ASSERT(sock_->state() != oasys::IPSocket::ESTABLISHED);
00421 int ret = sock_->connect(sock_->remote_addr(), sock_->remote_port());
00422
00423 if (ret == 0) {
00424 log_debug("connect: succeeded immediately");
00425 ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00426
00427 initiate_contact();
00428
00429 } else if (ret == -1 && errno == EINPROGRESS) {
00430 log_debug("connect: EINPROGRESS returned, waiting for write ready");
00431 sock_pollfd_->events |= POLLOUT;
00432
00433 } else {
00434 log_info("connection attempt to %s:%d failed... %s",
00435 intoa(sock_->remote_addr()), sock_->remote_port(),
00436 strerror(errno));
00437 break_contact(ContactEvent::BROKEN);
00438 }
00439 }
00440
00441
00442 void
00443 TCPConvergenceLayer::Connection::accept()
00444 {
00445 ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00446
00447 log_debug("accept: got connection from %s:%d...",
00448 intoa(sock_->remote_addr()), sock_->remote_port());
00449 initiate_contact();
00450 }
00451
00452
00453 void
00454 TCPConvergenceLayer::Connection::disconnect()
00455 {
00456 if (sock_->state() != oasys::IPSocket::CLOSED) {
00457 sock_->close();
00458 }
00459 }
00460
00461
00462 void
00463 TCPConvergenceLayer::Connection::handle_poll_activity()
00464 {
00465 if (sock_pollfd_->revents & POLLHUP) {
00466 log_info("remote socket closed connection -- returned POLLHUP");
00467 break_contact(ContactEvent::BROKEN);
00468 return;
00469 }
00470
00471 if (sock_pollfd_->revents & POLLERR) {
00472 log_info("error condition on remote socket -- returned POLLERR");
00473 break_contact(ContactEvent::BROKEN);
00474 return;
00475 }
00476
00477
00478
00479
00480 if (sock_pollfd_->revents & POLLOUT)
00481 {
00482 log_debug("poll returned write ready, clearing POLLOUT bit");
00483 sock_pollfd_->events &= ~POLLOUT;
00484
00485 if (sock_->state() == oasys::IPSocket::CONNECTING) {
00486 int result = sock_->async_connect_result();
00487 if (result == 0 && sendbuf_.fullbytes() == 0) {
00488 log_debug("delayed_connect to %s:%d succeeded",
00489 intoa(sock_->remote_addr()), sock_->remote_port());
00490 initiate_contact();
00491
00492 } else {
00493 log_info("connection attempt to %s:%d failed... %s",
00494 intoa(sock_->remote_addr()), sock_->remote_port(),
00495 strerror(errno));
00496 break_contact(ContactEvent::BROKEN);
00497 }
00498
00499 return;
00500 }
00501
00502 send_data();
00503 }
00504
00505
00506 if (contact_broken_)
00507 {
00508 return;
00509 }
00510
00511
00512 if (sock_pollfd_->revents & POLLIN) {
00513 recv_data();
00514 process_data();
00515
00516
00517
00518 if (recvbuf_.tailbytes() == 0) {
00519 log_err("process_data left no space in recvbuf!!");
00520 }
00521
00522 if (! contact_broken_) {
00523 check_keepalive();
00524 }
00525
00526 }
00527
00528 }
00529
00530
00531 void
00532 TCPConvergenceLayer::Connection::send_data()
00533 {
00534
00535
00536
00537 ASSERT(! contact_broken_);
00538
00539 if (params_->test_write_delay_ != 0) {
00540 log_debug("send_data: sleeping for test_write_delay msecs %u",
00541 params_->test_write_delay_);
00542
00543 usleep(params_->test_write_delay_ * 1000);
00544 }
00545
00546 log_debug("send_data: trying to drain %zu bytes from send buffer...",
00547 sendbuf_.fullbytes());
00548 ASSERT(sendbuf_.fullbytes() > 0);
00549 int cc = sock_->write(sendbuf_.start(), sendbuf_.fullbytes());
00550 if (cc > 0) {
00551 log_debug("send_data: wrote %d/%zu bytes from send buffer",
00552 cc, sendbuf_.fullbytes());
00553 sendbuf_.consume(cc);
00554
00555 if (sendbuf_.fullbytes() != 0) {
00556 log_debug("send_data: incomplete write, setting POLLOUT bit");
00557 sock_pollfd_->events |= POLLOUT;
00558
00559 } else {
00560 if (sock_pollfd_->events & POLLOUT) {
00561 log_debug("send_data: drained buffer, clearing POLLOUT bit");
00562 sock_pollfd_->events &= ~POLLOUT;
00563 }
00564 }
00565 } else if (errno == EWOULDBLOCK) {
00566 log_debug("send_data: write returned EWOULDBLOCK, setting POLLOUT bit");
00567 sock_pollfd_->events |= POLLOUT;
00568
00569 } else {
00570 log_info("send_data: remote connection unexpectedly closed: %s",
00571 strerror(errno));
00572 break_contact(ContactEvent::BROKEN);
00573 }
00574 }
00575
00576
00577 void
00578 TCPConvergenceLayer::Connection::recv_data()
00579 {
00580
00581
00582
00583 ASSERT(! contact_broken_);
00584
00585
00586 if (recvbuf_.tailbytes() == 0) {
00587 log_err("no space in receive buffer to accept data!!!");
00588 return;
00589 }
00590
00591 if (params_->test_read_delay_ != 0) {
00592 log_debug("recv_data: sleeping for test_read_delay msecs %u",
00593 params_->test_read_delay_);
00594
00595 usleep(params_->test_read_delay_ * 1000);
00596 }
00597
00598 log_debug("recv_data: draining up to %zu bytes into recv buffer...",
00599 recvbuf_.tailbytes());
00600 int cc = sock_->read(recvbuf_.end(), recvbuf_.tailbytes());
00601 if (cc < 1) {
00602 log_info("remote connection unexpectedly closed");
00603 break_contact(ContactEvent::BROKEN);
00604 return;
00605 }
00606
00607 log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
00608 cc, recvbuf_.fullbytes());
00609 recvbuf_.fill(cc);
00610 }
00611
00612 }