00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include <sys/poll.h>
00018
00019 #include <oasys/io/NetUtils.h>
00020 #include <oasys/thread/Timer.h>
00021 #include <oasys/util/OptParser.h>
00022 #include <oasys/util/StringBuffer.h>
00023 #include <oasys/util/URL.h>
00024
00025 #include "UDPConvergenceLayer.h"
00026 #include "bundling/Bundle.h"
00027 #include "bundling/BundleEvent.h"
00028 #include "bundling/BundleDaemon.h"
00029 #include "bundling/BundleList.h"
00030 #include "bundling/BundleProtocol.h"
00031
00032 namespace dtn {
00033
00034 struct UDPConvergenceLayer::Params UDPConvergenceLayer::defaults_;
00035
00036
00037 void
00038 UDPConvergenceLayer::Params::serialize(oasys::SerializeAction *a)
00039 {
00040 oasys::Intoa local_addr(local_addr_);
00041 const char *local_addr_str = local_addr.buf();
00042
00043 oasys::Intoa remote_addr(remote_addr_);
00044 const char *remote_addr_str = remote_addr.buf();
00045
00046 a->process("local_addr",
00047 (u_char *) local_addr_str, strlen(local_addr_str));
00048 a->process("remote_addr",
00049 (u_char *) remote_addr_str, strlen(remote_addr_str));
00050 a->process("local_port", &local_port_);
00051 a->process("remote_port", &remote_port_);
00052 a->process("rate", &rate_);
00053 a->process("bucket_depth", &bucket_depth_);
00054 }
00055
00056
00057 UDPConvergenceLayer::UDPConvergenceLayer()
00058 : IPConvergenceLayer("UDPConvergenceLayer", "udp")
00059 {
00060 defaults_.local_addr_ = INADDR_ANY;
00061 defaults_.local_port_ = UDPCL_DEFAULT_PORT;
00062 defaults_.remote_addr_ = INADDR_NONE;
00063 defaults_.remote_port_ = 0;
00064 defaults_.rate_ = 0;
00065 defaults_.bucket_depth_ = 0;
00066 }
00067
00068
00069 bool
00070 UDPConvergenceLayer::parse_params(Params* params,
00071 int argc, const char** argv,
00072 const char** invalidp)
00073 {
00074 oasys::OptParser p;
00075
00076 p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_));
00077 p.addopt(new oasys::UInt16Opt("local_port", ¶ms->local_port_));
00078 p.addopt(new oasys::InAddrOpt("remote_addr", ¶ms->remote_addr_));
00079 p.addopt(new oasys::UInt16Opt("remote_port", ¶ms->remote_port_));
00080 p.addopt(new oasys::UIntOpt("rate", ¶ms->rate_));
00081 p.addopt(new oasys::UIntOpt("bucket_depth_", ¶ms->bucket_depth_));
00082
00083 if (! p.parse(argc, argv, invalidp)) {
00084 return false;
00085 }
00086
00087 return true;
00088 };
00089
00090
00091 bool
00092 UDPConvergenceLayer::interface_up(Interface* iface,
00093 int argc, const char* argv[])
00094 {
00095 log_debug("adding interface %s", iface->name().c_str());
00096
00097
00098
00099 Params params = UDPConvergenceLayer::defaults_;
00100 const char* invalid;
00101 if (!parse_params(¶ms, argc, argv, &invalid)) {
00102 log_err("error parsing interface options: invalid option '%s'",
00103 invalid);
00104 return false;
00105 }
00106
00107
00108 if (params.local_addr_ == INADDR_NONE) {
00109 log_err("invalid local address setting of 0");
00110 return false;
00111 }
00112
00113 if (params.local_port_ == 0) {
00114 log_err("invalid local port setting of 0");
00115 return false;
00116 }
00117
00118
00119 Receiver* receiver = new Receiver(¶ms);
00120 receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
00121
00122 if (receiver->bind(params.local_addr_, params.local_port_) != 0) {
00123 return false;
00124 }
00125
00126
00127 if (params.remote_addr_ != INADDR_NONE) {
00128 if (receiver->connect(params.remote_addr_, params.remote_port_) != 0) {
00129 return false;
00130 }
00131 }
00132
00133
00134 receiver->start();
00135
00136
00137
00138 iface->set_cl_info(receiver);
00139
00140 return true;
00141 }
00142
00143
00144 bool
00145 UDPConvergenceLayer::interface_down(Interface* iface)
00146 {
00147
00148
00149
00150
00151 Receiver* receiver = (Receiver*)iface->cl_info();
00152 receiver->set_should_stop();
00153 receiver->interrupt_from_io();
00154
00155 while (! receiver->is_stopped()) {
00156 oasys::Thread::yield();
00157 }
00158
00159 delete receiver;
00160 return true;
00161 }
00162
00163
00164 void
00165 UDPConvergenceLayer::dump_interface(Interface* iface,
00166 oasys::StringBuffer* buf)
00167 {
00168 Params* params = &((Receiver*)iface->cl_info())->params_;
00169
00170 buf->appendf("\tlocal_addr: %s local_port: %d\n",
00171 intoa(params->local_addr_), params->local_port_);
00172
00173 if (params->remote_addr_ != INADDR_NONE) {
00174 buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
00175 intoa(params->remote_addr_), params->remote_port_);
00176 } else {
00177 buf->appendf("\tnot connected\n");
00178 }
00179 }
00180
00181
00182 bool
00183 UDPConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00184 {
00185 in_addr_t addr;
00186 u_int16_t port = 0;
00187
00188 log_debug("adding %s link %s", link->type_str(), link->nexthop());
00189
00190
00191
00192
00193 parse_nexthop(link->nexthop(), &addr, &port);
00194
00195
00196
00197 Params* params = new Params(defaults_);
00198 params->local_addr_ = INADDR_NONE;
00199 params->local_port_ = 0;
00200
00201 const char* invalid;
00202 if (! parse_params(params, argc, argv, &invalid)) {
00203 log_err("error parsing link options: invalid option '%s'", invalid);
00204 delete params;
00205 return false;
00206 }
00207
00208 if (link->params().mtu_ > MAX_BUNDLE_LEN) {
00209 log_err("error parsing link options: mtu %d > maximum %d",
00210 link->params().mtu_, MAX_BUNDLE_LEN);
00211 delete params;
00212 return false;
00213 }
00214
00215 link->set_cl_info(params);
00216 return true;
00217 }
00218
00219
00220 void
00221 UDPConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00222 {
00223 Params* params = (Params*)link->cl_info();
00224
00225 buf->appendf("\tlocal_addr: %s local_port: %d\n",
00226 intoa(params->local_addr_), params->local_port_);
00227
00228 buf->appendf("\tremote_addr: %s remote_port: %d\n",
00229 intoa(params->remote_addr_), params->remote_port_);
00230 }
00231
00232
00233 bool
00234 UDPConvergenceLayer::open_contact(const ContactRef& contact)
00235 {
00236 in_addr_t addr;
00237 u_int16_t port;
00238
00239 Link* link = contact->link();
00240 log_debug("opening contact for link *%p", link);
00241
00242
00243 if (! parse_nexthop(link->nexthop(), &addr, &port)) {
00244 log_err("invalid next hop address '%s'", link->nexthop());
00245 return false;
00246 }
00247
00248
00249 if (addr == INADDR_ANY || addr == INADDR_NONE) {
00250 log_err("can't lookup hostname in next hop address '%s'",
00251 link->nexthop());
00252 return false;
00253 }
00254
00255
00256 if (port == 0) {
00257 log_err("port not specified in next hop address '%s'",
00258 link->nexthop());
00259 return false;
00260 }
00261
00262 Params* params = (Params*)link->cl_info();
00263
00264
00265 Sender* sender = new Sender(link->contact());
00266
00267 if (!sender->init(params, addr, port)) {
00268 log_err("error initializing contact");
00269 BundleDaemon::post(
00270 new LinkStateChangeRequest(link, Link::UNAVAILABLE,
00271 ContactEvent::NO_INFO));
00272 delete sender;
00273 return false;
00274 }
00275
00276 contact->set_cl_info(sender);
00277 BundleDaemon::post(new ContactUpEvent(link->contact()));
00278
00279 return true;
00280 }
00281
00282
00283 bool
00284 UDPConvergenceLayer::close_contact(const ContactRef& contact)
00285 {
00286 Sender* sender = (Sender*)contact->cl_info();
00287
00288 log_info("close_contact *%p", contact.object());
00289
00290 if (sender) {
00291 delete sender;
00292 contact->set_cl_info(NULL);
00293 }
00294
00295 return true;
00296 }
00297
00298
00299 void
00300 UDPConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00301 {
00302 Sender* sender = (Sender*)contact->cl_info();
00303 if (!sender) {
00304 log_crit("send_bundles called on contact *%p with no Sender!!",
00305 contact.object());
00306 return;
00307 }
00308 ASSERT(contact == sender->contact_);
00309
00310 int len = sender->send_bundle(bundle);
00311
00312 if (len > 0) {
00313 BundleDaemon::post(
00314 new BundleTransmittedEvent(bundle, contact,
00315 contact->link(), len, 0));
00316 } else {
00317 BundleDaemon::post(
00318 new BundleTransmitFailedEvent(bundle, contact,contact->link()));
00319 }
00320 }
00321
00322
00323 UDPConvergenceLayer::Receiver::Receiver(UDPConvergenceLayer::Params* params)
00324 : IOHandlerBase(new oasys::Notifier("/dtn/cl/udp/receiver")),
00325 UDPClient("/dtn/cl/udp/receiver"),
00326 Thread("UDPConvergenceLayer::Receiver")
00327 {
00328 logfd_ = false;
00329 params_ = *params;
00330 }
00331
00332
00333 void
00334 UDPConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00335 {
00336
00337 Bundle* bundle = new Bundle();
00338
00339 bool complete = false;
00340 int cc = BundleProtocol::consume(bundle, bp, len, &complete);
00341
00342 if (cc < 0) {
00343 log_err("process_data: bundle protocol error");
00344 delete bundle;
00345 return;
00346 }
00347
00348 if (!complete) {
00349 log_err("process_data: incomplete bundle");
00350 delete bundle;
00351 return;
00352 }
00353
00354 log_debug("process_data: new bundle id %d arrival, length %zu (payload %zu)",
00355 bundle->bundleid_, len, bundle->payload_.length());
00356
00357 BundleDaemon::post(
00358 new BundleReceivedEvent(bundle, EVENTSRC_PEER, len));
00359 }
00360
00361
00362 void
00363 UDPConvergenceLayer::Receiver::run()
00364 {
00365 int ret;
00366 in_addr_t addr;
00367 u_int16_t port;
00368 u_char buf[MAX_UDP_PACKET];
00369
00370 while (1) {
00371 if (should_stop())
00372 break;
00373
00374 ret = recvfrom((char*)buf, MAX_UDP_PACKET, 0, &addr, &port);
00375 if (ret <= 0) {
00376 if (errno == EINTR) {
00377 continue;
00378 }
00379 log_err("error in recvfrom(): %d %s",
00380 errno, strerror(errno));
00381 close();
00382 break;
00383 }
00384
00385 log_debug("got %d byte packet from %s:%d",
00386 ret, intoa(addr), port);
00387 process_data(buf, ret);
00388 }
00389 }
00390
00391
00392 UDPConvergenceLayer::Sender::Sender(const ContactRef& contact)
00393 : Logger("UDPConvergenceLayer::Sender",
00394 "/dtn/cl/udp/sender/%p", this),
00395 socket_(logpath_),
00396 rate_socket_(logpath_, 0, 0),
00397 contact_(contact.object(), "UDPCovergenceLayer::Sender")
00398 {
00399 }
00400
00401
00402 bool
00403 UDPConvergenceLayer::Sender::init(Params* params,
00404 in_addr_t addr, u_int16_t port)
00405
00406 {
00407 log_debug("initializing sender");
00408
00409 params_ = params;
00410
00411 socket_.logpathf("%s/conn/%s:%d", logpath_, intoa(addr), port);
00412 socket_.set_logfd(false);
00413
00414 if (params->local_addr_ != INADDR_NONE || params->local_port_ != 0)
00415 {
00416 if (socket_.bind(params->local_addr_, params->local_port_) != 0) {
00417 log_err("error binding to %s:%d: %s",
00418 intoa(params->local_addr_), params->local_port_,
00419 strerror(errno));
00420 return false;
00421 }
00422 }
00423
00424 if (socket_.connect(addr, port) != 0) {
00425 log_err("error issuing udp connect to %s:%d: %s",
00426 intoa(addr), port, strerror(errno));
00427 return false;
00428 }
00429
00430 if (params->rate_ != 0) {
00431 rate_socket_.bucket()->set_rate(params->rate_);
00432
00433 if (params->bucket_depth_ != 0) {
00434 rate_socket_.bucket()->set_depth(params->bucket_depth_);
00435 }
00436
00437 log_debug("initialized rate controller: rate %u depth %u",
00438 rate_socket_.bucket()->rate(),
00439 rate_socket_.bucket()->depth());
00440 }
00441
00442 return true;
00443 }
00444
00445
00446 int
00447 UDPConvergenceLayer::Sender::send_bundle(Bundle* bundle)
00448 {
00449 BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(contact_->link());
00450 ASSERT(blocks != NULL);
00451
00452 bool complete = false;
00453 size_t total_len = BundleProtocol::produce(bundle, blocks,
00454 buf_, 0, sizeof(buf_),
00455 &complete);
00456 if (!complete) {
00457 size_t formatted_len = BundleProtocol::total_length(blocks);
00458 log_err("send_bundle: bundle too big (%zu > %u)",
00459 formatted_len, UDPConvergenceLayer::MAX_BUNDLE_LEN);
00460 return -1;
00461 }
00462
00463
00464 int cc = socket_.write((char*)buf_, total_len);
00465 if (cc == (int)total_len) {
00466 log_info("send_bundle: successfully sent bundle length %d", cc);
00467 return total_len;
00468 } else {
00469 log_err("send_bundle: error sending bundle (wrote %d/%zu): %s",
00470 cc, total_len, strerror(errno));
00471 return -1;
00472 }
00473 }
00474
00475 }