00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040 #ifdef __linux__
00041
00042 #include <sys/poll.h>
00043 #include <stdlib.h>
00044 #include <sys/types.h>
00045 #include <sys/socket.h>
00046 #include <netinet/in.h>
00047 #include <net/ethernet.h>
00048 #include <netpacket/packet.h>
00049 #include <sys/ioctl.h>
00050
00051 #include <oasys/io/NetUtils.h>
00052 #include <oasys/io/IO.h>
00053 #include <oasys/thread/Timer.h>
00054 #include <oasys/util/OptParser.h>
00055 #include <oasys/util/URL.h>
00056 #include <oasys/util/StringBuffer.h>
00057
00058 #include "EthConvergenceLayer.h"
00059 #include "bundling/Bundle.h"
00060 #include "bundling/BundleEvent.h"
00061 #include "bundling/BundleDaemon.h"
00062 #include "bundling/BundleList.h"
00063 #include "bundling/BundleProtocol.h"
00064 #include "contacts/ContactManager.h"
00065 #include "contacts/Link.h"
00066
00067 using namespace oasys;
00068 namespace dtn {
00069
00070 struct EthConvergenceLayer::Params EthConvergenceLayer::defaults_;
00071
00072
00073
00074
00075
00076
00077
00078 EthConvergenceLayer::EthConvergenceLayer()
00079 : ConvergenceLayer("EthConvergenceLayer", "eth")
00080 {
00081 defaults_.beacon_interval_ = 1;
00082 }
00083
00087 bool
00088 EthConvergenceLayer::parse_params(Params* params,
00089 int argc, const char* argv[],
00090 const char** invalidp)
00091 {
00092 oasys::OptParser p;
00093
00094 p.addopt(new oasys::UIntOpt("beacon_interval", ¶ms->beacon_interval_));
00095
00096 if (! p.parse(argc, argv, invalidp)) {
00097 return false;
00098 }
00099
00100 return true;
00101 }
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114 bool
00115 EthConvergenceLayer::interface_up(Interface* iface,
00116 int argc, const char* argv[])
00117 {
00118 Params params = EthConvergenceLayer::defaults_;
00119 const char *invalid;
00120 if (!parse_params(¶ms, argc, argv, &invalid)) {
00121 log_err("error parsing interface options: invalid option '%s'",
00122 invalid);
00123 return false;
00124 }
00125
00126
00127
00128
00129
00130 const char* if_name=iface->name().c_str()+strlen("string://");
00131 log_info("EthConvergenceLayer::interface_up(%s).", if_name);
00132
00133 Receiver* receiver = new Receiver(if_name, ¶ms);
00134 receiver->logpathf("/cl/eth");
00135 receiver->start();
00136 iface->set_cl_info(receiver);
00137
00138
00139 if_beacon_ = new Beacon(if_name, params.beacon_interval_);
00140 if_beacon_->logpathf("/cl/eth");
00141 if_beacon_->start();
00142
00143 return true;
00144 }
00145
00146 bool
00147 EthConvergenceLayer::interface_down(Interface* iface)
00148 {
00149
00150
00151
00152
00153
00154 if_beacon_->set_should_stop();
00155 while (! if_beacon_->is_stopped()) {
00156 oasys::Thread::yield();
00157 }
00158 delete if_beacon_;
00159
00160 Receiver *receiver = (Receiver *)iface->cl_info();
00161 receiver->set_should_stop();
00162
00163 while (! receiver->is_stopped()) {
00164 oasys::Thread::yield();
00165 }
00166 delete receiver;
00167
00168 return true;
00169 }
00170
00171 bool
00172 EthConvergenceLayer::open_contact(const ContactRef& contact)
00173 {
00174 eth_addr_t addr;
00175
00176 Link* link = contact->link();
00177 log_debug("opening contact to link *%p", link);
00178
00179
00180 if (! EthernetScheme::parse(link->nexthop(), &addr)) {
00181 log_err("next hop address '%s' not a valid eth uri",
00182 link->nexthop());
00183 return false;
00184 }
00185
00186
00187 Sender* sender = new Sender(((EthCLInfo*)link->cl_info())->if_name_,
00188 link->contact());
00189 contact->set_cl_info(sender);
00190
00191 sender->logpathf("/cl/eth");
00192
00193 BundleDaemon::post(new ContactUpEvent(contact));
00194 return true;
00195 }
00196
00197 bool
00198 EthConvergenceLayer::close_contact(const ContactRef& contact)
00199 {
00200 Sender* sender = (Sender*)contact->cl_info();
00201
00202 log_info("close_contact *%p", contact.object());
00203
00204 if (sender) {
00205 contact->set_cl_info(NULL);
00206 delete sender;
00207 }
00208
00209 return true;
00210 }
00211
00215 void
00216 EthConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00217 {
00218 Sender* sender = (Sender*)contact->cl_info();
00219 if (!sender) {
00220 log_crit("send_bundles called on contact *%p with no Sender!!",
00221 contact.object());
00222 return;
00223 }
00224 ASSERT(contact == sender->contact_);
00225
00226 sender->send_bundle(bundle);
00227 }
00228
00229
00230
00231
00232
00233
00234
00235 EthConvergenceLayer::Receiver::Receiver(const char* if_name,
00236 EthConvergenceLayer::Params* params)
00237 : Logger("EthConvergenceLayer::Receiver", "/dtn/cl/eth/receiver"),
00238 Thread("EthConvergenceLayer::Receiver")
00239 {
00240 memset(if_name_,0, IFNAMSIZ);
00241 strcpy(if_name_,if_name);
00242 Thread::flags_ |= INTERRUPTABLE;
00243 (void)params;
00244 }
00245
00246 void
00247 EthConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00248 {
00249 Bundle* bundle = NULL;
00250 EthCLHeader ethclhdr;
00251 size_t header_len, bundle_len;
00252 struct ether_header* ethhdr=(struct ether_header*)bp;
00253
00254 log_debug("Received DTN packet on interface %s, %d.",if_name_, len);
00255
00256
00257 if (len < sizeof(EthCLHeader)) {
00258 log_err("process_data: "
00259 "incoming packet too small (len = %d)", len);
00260 return;
00261 }
00262 memcpy(ðclhdr, bp+sizeof(struct ether_header), sizeof(EthCLHeader));
00263
00264
00265 if (ethclhdr.version != ETHCL_VERSION) {
00266 log_warn("remote sent version %d, expected version %d "
00267 "-- disconnecting.", ethclhdr.version, ETHCL_VERSION);
00268 return;
00269 }
00270
00271 if(ethclhdr.type == ETHCL_BEACON) {
00272 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00273
00274 char bundles_string[60];
00275 memset(bundles_string,0,60);
00276 EthernetScheme::to_string(ðhdr->ether_shost[0],
00277 bundles_string);
00278 char next_hop_string[50], *ptr;
00279 memset(next_hop_string,0,50);
00280 ptr = strrchr(bundles_string, '/');
00281 strcpy(next_hop_string, ptr+1);
00282
00283 ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00284 EndpointID remote_eid(bundles_string);
00285
00286 Link* link=cm->find_link_to(cl,
00287 next_hop_string,
00288 remote_eid,
00289 Link::OPPORTUNISTIC);
00290
00291 if(!link)
00292 {
00293 log_info("Discovered next_hop %s on interface %s.",
00294 next_hop_string, if_name_);
00295
00296
00297 link=cm->new_opportunistic_link(
00298 cl,
00299 next_hop_string,
00300 EndpointID(bundles_string));
00301
00302
00303 if (link->cl_info() == NULL) {
00304 link->set_cl_info(new EthCLInfo(if_name_));
00305 } else {
00306 ASSERT(strcmp(((EthCLInfo*)link->cl_info())->if_name_,
00307 if_name_) == 0);
00308 }
00309 }
00310
00311 if(!link->isavailable())
00312 {
00313 log_info("Got beacon for previously unavailable link");
00314
00315
00316 log_err("XXx/demmer do something about link availability");
00317 }
00318
00324 BeaconTimer *timer = ((EthCLInfo*)link->cl_info())->timer;
00325 if (timer)
00326 timer->cancel();
00327
00328 timer = new BeaconTimer(next_hop_string);
00329 timer->schedule_in(ETHCL_BEACON_TIMEOUT_INTERVAL);
00330
00331 ((EthCLInfo*)link->cl_info())->timer=timer;
00332 }
00333 else if(ethclhdr.type == ETHCL_BUNDLE) {
00334
00335
00336 bundle_len = len - sizeof(EthCLHeader) - sizeof(struct ether_header);
00337
00338 log_debug("process_data: got ethcl header -- bundle id %d, length %d",
00339 ntohl(ethclhdr.bundle_id), bundle_len);
00340
00341
00342 bp += (sizeof(EthCLHeader) + sizeof(struct ether_header));
00343 len -= (sizeof(EthCLHeader) + sizeof(struct ether_header));
00344
00345
00346
00347
00348 bundle = new Bundle();
00349 header_len =
00350 BundleProtocol::parse_header_blocks(bundle, (u_char*)bp, len);
00351
00352 size_t payload_len = bundle->payload_.length();
00353 if (bundle_len != header_len + payload_len) {
00354 log_err("process_data: error in bundle lengths: "
00355 "bundle_length %d, header_length %d, payload_length %d",
00356 bundle_len, header_len, payload_len);
00357 delete bundle;
00358 return;
00359 }
00360
00361
00362 bp += header_len;
00363 len -= header_len;
00364 bundle->payload_.append_data(bp, len);
00365
00366 log_debug("process_data: new bundle id %d arrival, payload length %d",
00367 bundle->bundleid_, bundle->payload_.length());
00368
00369 BundleDaemon::post(
00370 new BundleReceivedEvent(bundle, EVENTSRC_PEER, len));
00371 }
00372 }
00373
00374 void
00375 EthConvergenceLayer::Receiver::run()
00376 {
00377 int sock;
00378 int cc;
00379 struct sockaddr_ll iface;
00380 unsigned char buffer[MAX_ETHER_PACKET];
00381
00382 if((sock = socket(PF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00383 perror("socket");
00384 log_err("EthConvergenceLayer::Receiver::run() "
00385 "Couldn't open socket.");
00386 exit(1);
00387 }
00388
00389
00390 struct ifreq req;
00391 strcpy(req.ifr_name, if_name_);
00392 ioctl(sock, SIOCGIFINDEX, &req);
00393
00394 memset(&iface, 0, sizeof(iface));
00395 iface.sll_family=AF_PACKET;
00396 iface.sll_protocol=htons(ETHERTYPE_DTN);
00397 iface.sll_ifindex=req.ifr_ifindex;
00398
00399 if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00400 perror("bind");
00401 exit(1);
00402 }
00403
00404 log_warn("Reading from socket...");
00405 while(true) {
00406 cc=read (sock, buffer, MAX_ETHER_PACKET);
00407 if(cc<=0) {
00408 perror("EthConvergenceLayer::Receiver::run()");
00409 exit(1);
00410 }
00411 struct ether_header* hdr=(struct ether_header*)buffer;
00412
00413 if(ntohs(hdr->ether_type)==ETHERTYPE_DTN) {
00414 process_data(buffer, cc);
00415 }
00416 else if(ntohs(hdr->ether_type)!=0x800)
00417 {
00418 log_err("Got non-DTN packet in Receiver, type %4X.",
00419 ntohs(hdr->ether_type));
00420
00421 }
00422
00423 if(should_stop())
00424 break;
00425 }
00426 }
00427
00428
00429
00430
00431
00432
00433
00437 EthConvergenceLayer::Sender::Sender(char* if_name,
00438 const ContactRef& contact)
00439 : Logger("EthConvergenceLayer::Sender", "/dtn/cl/eth/sender"),
00440 contact_(contact.object(), "EthConvergenceLayer::Sender")
00441 {
00442 struct ifreq req;
00443 struct sockaddr_ll iface;
00444 Link *link = contact->link();
00445
00446 memset(src_hw_addr_.octet, 0, 6);
00447 EthernetScheme::parse(link->nexthop(), &dst_hw_addr_);
00448
00449 strcpy(if_name_, if_name);
00450 sock_ = 0;
00451
00452 memset(&req, 0, sizeof(req));
00453 memset(&iface, 0, sizeof(iface));
00454
00455
00456
00457
00458 if((sock_ = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00459 perror("socket");
00460 exit(1);
00461 }
00462
00463
00464 strcpy(req.ifr_name, if_name_);
00465
00466
00467 ioctl(sock_, SIOCGIFINDEX, &req);
00468
00469 iface.sll_family=AF_PACKET;
00470 iface.sll_protocol=htons(ETHERTYPE_DTN);
00471 iface.sll_ifindex=req.ifr_ifindex;
00472
00473
00474 if(ioctl(sock_, SIOCGIFHWADDR, &req))
00475 {
00476 perror("ioctl");
00477 exit(1);
00478 }
00479 memcpy(src_hw_addr_.octet,req.ifr_hwaddr.sa_data,6);
00480
00481 if (bind(sock_, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00482 perror("bind");
00483 exit(1);
00484 }
00485 }
00486
00487
00488
00489
00490 bool
00491 EthConvergenceLayer::Sender::send_bundle(Bundle* bundle)
00492 {
00493 int cc;
00494 int iovcnt = 1;
00495 struct iovec iov[iovcnt + 3];
00496
00497 EthCLHeader ethclhdr;
00498 struct ether_header hdr;
00499
00500 memset(iov,0,(iovcnt+3)*sizeof(struct iovec));
00501
00502
00503
00504 iov[0].iov_base = (char*)&hdr;
00505 iov[0].iov_len = sizeof(struct ether_header);
00506
00507
00508
00509 memcpy(hdr.ether_dhost,dst_hw_addr_.octet,6);
00510 memcpy(hdr.ether_shost,src_hw_addr_.octet,6);
00511 hdr.ether_type=htons(ETHERTYPE_DTN);
00512
00513
00514
00515 iov[1].iov_base = (char*)ðclhdr;
00516 iov[1].iov_len = sizeof(EthCLHeader);
00517
00518
00519
00520 ethclhdr.version = ETHCL_VERSION;
00521 ethclhdr.type = ETHCL_BUNDLE;
00522 ethclhdr.bundle_id = htonl(bundle->bundleid_);
00523
00524
00525
00526 u_int16_t header_len =
00527 BundleProtocol::format_header_blocks(bundle, buf_, sizeof(buf_));
00528 iov[2].iov_base = (char *)buf_;
00529 iov[2].iov_len = (size_t)header_len;
00530
00531 size_t payload_len = bundle->payload_.length();
00532
00533 log_info("send_bundle: bundle id %d, header_length %d payload_length %d",
00534 bundle->bundleid_, header_len, payload_len);
00535
00536 oasys::StringBuffer payload_buf(payload_len);
00537 const u_char* payload_data =
00538 bundle->payload_.read_data(0, payload_len, (u_char*)payload_buf.data());
00539
00540 iov[iovcnt + 2].iov_base = (char*)payload_data;
00541 iov[iovcnt + 2].iov_len = payload_len;
00542
00543
00544
00545 log_info("Sending bundle out interface %s",if_name_);
00546
00547 cc=IO::writevall(sock_, iov, iovcnt+3);
00548 if(cc<0) {
00549 perror("send");
00550 log_err("Send failed!\n");
00551 }
00552 log_info("Sent packet, size: %d",cc );
00553
00554
00555 bool ok;
00556
00557 int total = sizeof(EthCLHeader) + sizeof(struct ether_header) +
00558 header_len + payload_len;
00559 if (cc != total) {
00560 BundleDaemon::post(new BundleTransmitFailedEvent(bundle, contact_));
00561 log_err("send_bundle: error writing bundle (wrote %d/%d): %s",
00562 cc, total, strerror(errno));
00563 ok = false;
00564 } else {
00565
00566
00567
00568 BundleDaemon::post(
00569 new BundleTransmittedEvent(bundle, contact_,
00570 bundle->payload_.length(), false));
00571 ok = true;
00572 }
00573
00574 return ok;
00575 }
00576
00577 EthConvergenceLayer::Beacon::Beacon(const char* if_name,
00578 unsigned int beacon_interval)
00579 : Logger("EthConvergenceLayer::Beacon", "/dtn/cl/eth/beacon"),
00580 Thread("EthConvergenceLayer::Beacon")
00581 {
00582 Thread::flags_ |= INTERRUPTABLE;
00583 memset(if_name_, 0, IFNAMSIZ);
00584 strcpy(if_name_, if_name);
00585 beacon_interval_ = beacon_interval;
00586 }
00587
00588 void EthConvergenceLayer::Beacon::run()
00589 {
00590
00591 char bcast_mac_addr[6]={0xff,0xff,0xff,0xff,0xff,0xff};
00592
00593 struct ether_header hdr;
00594 struct sockaddr_ll iface;
00595 EthCLHeader ethclhdr;
00596
00597 int sock,cc;
00598 struct iovec iov[2];
00599
00600 memset(&hdr,0,sizeof(hdr));
00601 memset(ðclhdr,0,sizeof(ethclhdr));
00602 memset(&iface,0,sizeof(iface));
00603
00604 ethclhdr.version = ETHCL_VERSION;
00605 ethclhdr.type = ETHCL_BEACON;
00606
00607 hdr.ether_type=htons(ETHERTYPE_DTN);
00608
00609
00610 iov[0].iov_base = (char*)&hdr;
00611 iov[0].iov_len = sizeof(struct ether_header);
00612
00613
00614 iov[1].iov_base = (char*)ðclhdr;
00615 iov[1].iov_len = sizeof(EthCLHeader);
00616
00617
00618
00619
00620 if((sock = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00621 perror("socket");
00622 exit(1);
00623 }
00624
00625 struct ifreq req;
00626 strcpy(req.ifr_name, if_name_);
00627 if(ioctl(sock, SIOCGIFINDEX, &req))
00628 {
00629 perror("ioctl");
00630 exit(1);
00631 }
00632
00633 iface.sll_ifindex=req.ifr_ifindex;
00634
00635 if(ioctl(sock, SIOCGIFHWADDR, &req))
00636 {
00637 perror("ioctl");
00638 exit(1);
00639 }
00640
00641 memcpy(hdr.ether_dhost,bcast_mac_addr,6);
00642 memcpy(hdr.ether_shost,req.ifr_hwaddr.sa_data,6);
00643
00644 log_info("Interface %s has interface number %d.",if_name_,req.ifr_ifindex);
00645
00646 iface.sll_family=AF_PACKET;
00647 iface.sll_protocol=htons(ETHERTYPE_DTN);
00648
00649 if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00650 perror("bind");
00651 exit(1);
00652 }
00653
00654
00655
00656
00657 while(1) {
00658 sleep(beacon_interval_);
00659
00660 if (should_stop())
00661 break;
00662
00663 log_debug("Sent beacon out interface %s.\n",if_name_ );
00664
00665 cc=IO::writevall(sock, iov, 2);
00666 if(cc<0) {
00667 perror("send beacon");
00668 log_err("Send beacon failed!\n");
00669 }
00670 }
00671 }
00672
00673 EthConvergenceLayer::BeaconTimer::BeaconTimer(char * next_hop)
00674 : Logger("EthConvergenceLayer::BeaconTimer", "/dtn/cl/eth/beacontimer")
00675 {
00676 next_hop_=(char*)malloc(strlen(next_hop)+1);
00677 strcpy(next_hop_, next_hop);
00678 }
00679
00680 EthConvergenceLayer::BeaconTimer::~BeaconTimer()
00681 {
00682 free(next_hop_);
00683 }
00684
00685 void
00686 EthConvergenceLayer::BeaconTimer::timeout(const struct timeval& now)
00687 {
00688 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00689 ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00690 Link * l = cm->find_link_to(cl, next_hop_);
00691
00692 (void)now;
00693
00694 log_info("Neighbor %s timer expired.",next_hop_);
00695
00696 if(l == 0) {
00697 log_warn("No link for next_hop %s.",next_hop_);
00698 }
00699 else if(l->isopen()) {
00700 BundleDaemon::post(
00701 new LinkStateChangeRequest(l, Link::CLOSED,
00702 ContactDownEvent::BROKEN));
00703 }
00704 else {
00705 log_warn("next_hop %s unexpectedly not open",next_hop_);
00706 }
00707 }
00708
00709 Timer *
00710 EthConvergenceLayer::BeaconTimer::copy()
00711 {
00712 return new BeaconTimer(*this);
00713 }
00714
00715 }
00716
00717 #endif // __linux