00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <sys/poll.h>
00040 #include <stdlib.h>
00041
00042 #include <oasys/io/NetUtils.h>
00043 #include <oasys/util/OptParser.h>
00044
00045 #include "TCPConvergenceLayer.h"
00046 #include "IPConvergenceLayerUtils.h"
00047
00048 namespace dtn {
00049
00050 TCPConvergenceLayer::TCPLinkParams
00051 TCPConvergenceLayer::default_link_params_(true);
00052
00053
00054 TCPConvergenceLayer::TCPLinkParams::TCPLinkParams(bool init_defaults)
00055 : StreamLinkParams(init_defaults),
00056 local_addr_(INADDR_ANY),
00057 remote_addr_(INADDR_NONE),
00058 remote_port_(TCPCL_DEFAULT_PORT)
00059 {
00060 }
00061
00062
00063 TCPConvergenceLayer::TCPConvergenceLayer()
00064 : StreamConvergenceLayer("TCPConvergenceLayer", "tcp", TCPCL_VERSION)
00065 {
00066 }
00067
00068
00069 ConnectionConvergenceLayer::LinkParams*
00070 TCPConvergenceLayer::new_link_params()
00071 {
00072 return new TCPLinkParams(default_link_params_);
00073 }
00074
00075
00076 bool
00077 TCPConvergenceLayer::parse_link_params(LinkParams* lparams,
00078 int argc, const char** argv,
00079 const char** invalidp)
00080 {
00081 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00082 ASSERT(params != NULL);
00083
00084 oasys::OptParser p;
00085
00086 p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_));
00087
00088 int count = p.parse_and_shift(argc, argv, invalidp);
00089 if (count == -1) {
00090 return false;
00091 }
00092 argc -= count;
00093
00094 if (params->local_addr_ == INADDR_NONE) {
00095 log_err("invalid local address setting of INADDR_NONE");
00096 return false;
00097 }
00098
00099
00100 return StreamConvergenceLayer::parse_link_params(lparams, argc, argv,
00101 invalidp);
00102 }
00103
00104
00105 void
00106 TCPConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00107 {
00108 StreamConvergenceLayer::dump_link(link, buf);
00109
00110 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(link->cl_info());
00111 ASSERT(params != NULL);
00112
00113 buf->appendf("local_addr: %s\n", intoa(params->local_addr_));
00114 buf->appendf("remote_addr: %s\n", intoa(params->remote_addr_));
00115 buf->appendf("remote_port: %d\n", params->remote_port_);
00116 }
00117
00118
00119 bool
00120 TCPConvergenceLayer::set_link_defaults(int argc, const char* argv[],
00121 const char** invalidp)
00122 {
00123 return parse_link_params(&default_link_params_, argc, argv, invalidp);
00124 }
00125
00126
00127 bool
00128 TCPConvergenceLayer::parse_nexthop(Link* link, LinkParams* lparams)
00129 {
00130 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(lparams);
00131 ASSERT(params != NULL);
00132
00133 if (! IPConvergenceLayerUtils::parse_nexthop(link->nexthop(),
00134 ¶ms->remote_addr_,
00135 ¶ms->remote_port_)) {
00136 log_err("invalid next hop address '%s'", link->nexthop());
00137 return false;
00138 }
00139
00140 if (params->remote_addr_ == INADDR_ANY ||
00141 params->remote_addr_ == INADDR_NONE)
00142 {
00143 log_err("invalid host in next hop address '%s'", link->nexthop());
00144 return false;
00145 }
00146
00147
00148 if (params->remote_port_ == 0) {
00149 log_err("port not specified in next hop address '%s'",
00150 link->nexthop());
00151 return false;
00152 }
00153
00154 return true;
00155 }
00156
00157
00158 CLConnection*
00159 TCPConvergenceLayer::new_connection(LinkParams* p)
00160 {
00161 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(p);
00162 ASSERT(params != NULL);
00163 return new Connection(this, params);
00164 }
00165
00166
00167 bool
00168 TCPConvergenceLayer::interface_up(Interface* iface,
00169 int argc, const char* argv[])
00170 {
00171 log_debug("adding interface %s", iface->name().c_str());
00172 in_addr_t local_addr = INADDR_ANY;
00173 u_int16_t local_port = TCPCL_DEFAULT_PORT;
00174
00175 oasys::OptParser p;
00176 p.addopt(new oasys::InAddrOpt("local_addr", &local_addr));
00177 p.addopt(new oasys::UInt16Opt("local_port", &local_port));
00178
00179 const char* invalid = NULL;
00180 if (! p.parse(argc, argv, &invalid)) {
00181 log_err("error parsing interface options: invalid option '%s'",
00182 invalid);
00183 return false;
00184 }
00185
00186
00187 if (local_addr == INADDR_NONE) {
00188 log_err("invalid local address setting of INADDR_NONE");
00189 return false;
00190 }
00191
00192 if (local_port == 0) {
00193 log_err("invalid local port setting of 0");
00194 return false;
00195 }
00196
00197
00198 Listener* listener = new Listener(this);
00199 listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00200
00201 int ret = listener->bind(local_addr, local_port);
00202
00203
00204
00205 if (ret != 0 && errno == EADDRINUSE) {
00206 listener->logf(oasys::LOG_WARN,
00207 "WARNING: error binding to requested socket: %s",
00208 strerror(errno));
00209 listener->logf(oasys::LOG_WARN,
00210 "waiting for 10 seconds then trying again");
00211 sleep(10);
00212
00213 ret = listener->bind(local_addr, local_port);
00214 }
00215
00216 if (ret != 0) {
00217 return false;
00218 }
00219
00220
00221 listener->listen();
00222 listener->start();
00223
00224
00225
00226 iface->set_cl_info(listener);
00227
00228 return true;
00229 }
00230
00231
00232 bool
00233 TCPConvergenceLayer::interface_down(Interface* iface)
00234 {
00235
00236
00237
00238
00239 Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00240 ASSERT(listener != NULL);
00241
00242 listener->set_should_stop();
00243
00244 listener->interrupt_from_io();
00245
00246 while (! listener->is_stopped()) {
00247 oasys::Thread::yield();
00248 }
00249
00250 delete listener;
00251 return true;
00252 }
00253
00254
00255 void
00256 TCPConvergenceLayer::dump_interface(Interface* iface,
00257 oasys::StringBuffer* buf)
00258 {
00259 Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
00260 ASSERT(listener != NULL);
00261
00262 buf->appendf("\tlocal_addr: %s local_port: %d\n",
00263 intoa(listener->local_addr()), listener->local_port());
00264 }
00265
00266
00267 TCPConvergenceLayer::Listener::Listener(TCPConvergenceLayer* cl)
00268 : IOHandlerBase(new oasys::Notifier("/dtn/cl/tcp/listener")),
00269 TCPServerThread("TCPConvergenceLayer::Listener",
00270 "/dtn/cl/tcp/listener"),
00271 cl_(cl)
00272 {
00273 logfd_ = false;
00274 }
00275
00276
00277 void
00278 TCPConvergenceLayer::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
00279 {
00280 log_debug("new connection from %s:%d", intoa(addr), port);
00281
00282 Connection* conn =
00283 new Connection(cl_, &TCPConvergenceLayer::default_link_params_,
00284 fd, addr, port);
00285 conn->start();
00286 }
00287
00288
00289 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00290 TCPLinkParams* params)
00291 : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00292 cl->logpath(), cl, params,
00293 true )
00294 {
00295 logpathf("%s/conn/%p", cl->logpath(), this);
00296
00297
00298 oasys::StringBuffer nexthop("%s:%d",
00299 intoa(params->remote_addr_),
00300 params->remote_port_);
00301 set_nexthop(nexthop.c_str());
00302
00303
00304 sock_ = new oasys::TCPClient(logpath_);
00305
00306
00307
00308
00309
00310
00311 sock_->logpathf("%s/sock", logpath_);
00312 sock_->set_logfd(false);
00313
00314
00315
00316
00317 sock_->set_remote_addr(params->remote_addr_);
00318 sock_->set_remote_port(params->remote_port_);
00319
00320 sock_->init_socket();
00321 sock_->set_nonblocking(true);
00322
00323
00324
00325
00326 if (params->local_addr_ != INADDR_ANY)
00327 {
00328 if (sock_->bind(params->local_addr_, 0) != 0) {
00329 log_err("error binding to %s: %s",
00330 intoa(params->local_addr_),
00331 strerror(errno));
00332 }
00333 }
00334 }
00335
00336
00337 TCPConvergenceLayer::Connection::Connection(TCPConvergenceLayer* cl,
00338 TCPLinkParams* params,
00339 int fd,
00340 in_addr_t remote_addr,
00341 u_int16_t remote_port)
00342 : StreamConvergenceLayer::Connection("TCPConvergenceLayer::Connection",
00343 cl->logpath(), cl, params,
00344 false )
00345 {
00346 logpathf("%s/conn/%p", cl->logpath(), this);
00347
00348
00349 oasys::StringBuffer nexthop("%s:%d", intoa(remote_addr), remote_port);
00350 set_nexthop(nexthop.c_str());
00351
00352 sock_ = new oasys::TCPClient(fd, remote_addr, remote_port, logpath_);
00353 sock_->set_logfd(false);
00354 sock_->set_nonblocking(true);
00355 }
00356
00357
00358 TCPConvergenceLayer::Connection::~Connection()
00359 {
00360 delete sock_;
00361 }
00362
00363
00364 void
00365 TCPConvergenceLayer::Connection::initialize_pollfds()
00366 {
00367 sock_pollfd_ = &pollfds_[0];
00368 num_pollfds_ = 1;
00369
00370 sock_pollfd_->fd = sock_->fd();
00371 sock_pollfd_->events = POLLIN;
00372
00373 TCPLinkParams* params = dynamic_cast<TCPLinkParams*>(params_);
00374 ASSERT(params != NULL);
00375
00376 poll_timeout_ = params->data_timeout_;
00377
00378 if (params->keepalive_interval_ != 0 &&
00379 (params->keepalive_interval_ * 1000) < params->data_timeout_)
00380 {
00381 poll_timeout_ = params->keepalive_interval_ * 1000;
00382 }
00383 }
00384
00385
00386 void
00387 TCPConvergenceLayer::Connection::connect()
00388 {
00389
00390
00391
00392 log_debug("connect: connecting to %s:%d...",
00393 intoa(sock_->remote_addr()), sock_->remote_port());
00394 ASSERT(contact_ != NULL);
00395 ASSERT(contact_->link()->isopening());
00396 ASSERT(sock_->state() != oasys::IPSocket::ESTABLISHED);
00397 int ret = sock_->connect(sock_->remote_addr(), sock_->remote_port());
00398
00399 if (ret == 0) {
00400 log_debug("connect: succeeded immediately");
00401 ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00402
00403 initiate_contact();
00404
00405 } else if (ret == -1 && errno == EINPROGRESS) {
00406 log_debug("connect: EINPROGRESS returned, waiting for write ready");
00407 sock_pollfd_->events |= POLLOUT;
00408
00409 } else {
00410 log_info("connection attempt to %s:%d failed... %s",
00411 intoa(sock_->remote_addr()), sock_->remote_port(),
00412 strerror(errno));
00413 break_contact(ContactEvent::BROKEN);
00414 }
00415 }
00416
00417
00418 void
00419 TCPConvergenceLayer::Connection::accept()
00420 {
00421 ASSERT(sock_->state() == oasys::IPSocket::ESTABLISHED);
00422
00423 log_debug("accept: got connection from %s:%d...",
00424 intoa(sock_->remote_addr()), sock_->remote_port());
00425 initiate_contact();
00426 }
00427
00428
00429 void
00430 TCPConvergenceLayer::Connection::disconnect()
00431 {
00432 if (sock_->state() != oasys::IPSocket::CLOSED) {
00433
00434
00435
00436 if (send_segment_todo_ == 0) {
00437 log_debug("disconnect: trying to send shutdown byte");
00438 char typecode = SHUTDOWN;
00439 sock_->write(&typecode, 1);
00440 }
00441
00442 sock_->close();
00443 }
00444 }
00445
00446
00447 void
00448 TCPConvergenceLayer::Connection::handle_poll_activity()
00449 {
00450 if (sock_pollfd_->revents & POLLHUP) {
00451 log_info("remote socket closed connection -- returned POLLHUP");
00452 break_contact(ContactEvent::BROKEN);
00453 return;
00454 }
00455
00456 if (sock_pollfd_->revents & POLLERR) {
00457 log_info("error condition on remote socket -- returned POLLERR");
00458 break_contact(ContactEvent::BROKEN);
00459 return;
00460 }
00461
00462
00463
00464
00465 if (sock_pollfd_->revents & POLLOUT)
00466 {
00467 log_debug("poll returned write ready, clearing POLLOUT bit");
00468 sock_pollfd_->events &= ~POLLOUT;
00469
00470 if (sock_->state() == oasys::IPSocket::CONNECTING) {
00471 int result = sock_->async_connect_result();
00472 if (result == 0) {
00473 log_debug("delayed_connect to %s:%d succeeded",
00474 intoa(sock_->remote_addr()), sock_->remote_port());
00475 initiate_contact();
00476
00477 } else {
00478 log_info("connection attempt to %s:%d failed... %s",
00479 intoa(sock_->remote_addr()), sock_->remote_port(),
00480 strerror(errno));
00481 break_contact(ContactEvent::BROKEN);
00482 }
00483
00484 return;
00485 }
00486
00487 send_data();
00488 }
00489
00490
00491 if (sock_pollfd_->revents & POLLIN) {
00492 recv_data();
00493 process_data();
00494 }
00495 }
00496
00497
00498 void
00499 TCPConvergenceLayer::Connection::send_data()
00500 {
00501
00502
00503
00504 ASSERT(! contact_broken_);
00505
00506 if (params_->test_write_delay_ != 0) {
00507 log_debug("send_data: sleeping for test_write_delay msecs %u",
00508 params_->test_write_delay_);
00509
00510 usleep(params_->test_write_delay_ * 1000);
00511 }
00512
00513 log_debug("send_data: trying to drain %zu bytes from send buffer...",
00514 sendbuf_.fullbytes());
00515 ASSERT(sendbuf_.fullbytes() > 0);
00516 int cc = sock_->write(sendbuf_.start(), sendbuf_.fullbytes());
00517 if (cc > 0) {
00518 log_debug("send_data: wrote %d/%zu bytes from send buffer",
00519 cc, sendbuf_.fullbytes());
00520 sendbuf_.consume(cc);
00521
00522 if (sendbuf_.fullbytes() != 0) {
00523 log_debug("send_data: incomplete write, setting POLLOUT bit");
00524 sock_pollfd_->events |= POLLOUT;
00525
00526 } else {
00527 if (sock_pollfd_->events & POLLOUT) {
00528 log_debug("send_data: drained buffer, clearing POLLOUT bit");
00529 sock_pollfd_->events &= ~POLLOUT;
00530 }
00531 }
00532 } else if (errno == EWOULDBLOCK) {
00533 log_debug("send_data: write returned EWOULDBLOCK, setting POLLOUT bit");
00534 sock_pollfd_->events |= POLLOUT;
00535
00536 } else {
00537 log_info("send_data: remote connection unexpectedly closed: %s",
00538 strerror(errno));
00539 break_contact(ContactEvent::BROKEN);
00540 }
00541 }
00542
00543
00544 void
00545 TCPConvergenceLayer::Connection::recv_data()
00546 {
00547
00548
00549
00550 ASSERT(! contact_broken_);
00551
00552
00553 if (recvbuf_.tailbytes() == 0) {
00554 log_err("no space in receive buffer to accept data!!!");
00555 return;
00556 }
00557
00558 if (params_->test_read_delay_ != 0) {
00559 log_debug("recv_data: sleeping for test_read_delay msecs %u",
00560 params_->test_read_delay_);
00561
00562 usleep(params_->test_read_delay_ * 1000);
00563 }
00564
00565 log_debug("recv_data: draining up to %zu bytes into recv buffer...",
00566 recvbuf_.tailbytes());
00567 int cc = sock_->read(recvbuf_.end(), recvbuf_.tailbytes());
00568 if (cc < 1) {
00569 log_info("remote connection unexpectedly closed");
00570 break_contact(ContactEvent::BROKEN);
00571 return;
00572 }
00573
00574 log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
00575 cc, recvbuf_.fullbytes());
00576 recvbuf_.fill(cc);
00577 }
00578
00579 }