00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #ifdef __linux__
00020
00021 #include <sys/poll.h>
00022 #include <stdlib.h>
00023 #include <sys/types.h>
00024 #include <sys/socket.h>
00025 #include <netinet/in.h>
00026 #include <net/ethernet.h>
00027 #include <netpacket/packet.h>
00028 #include <sys/ioctl.h>
00029
00030 #include <oasys/io/NetUtils.h>
00031 #include <oasys/io/IO.h>
00032 #include <oasys/thread/Timer.h>
00033 #include <oasys/util/OptParser.h>
00034 #include <oasys/util/URL.h>
00035 #include <oasys/util/StringBuffer.h>
00036
00037 #include "EthConvergenceLayer.h"
00038 #include "bundling/Bundle.h"
00039 #include "bundling/BundleEvent.h"
00040 #include "bundling/BundleDaemon.h"
00041 #include "bundling/BundleList.h"
00042 #include "bundling/BundleProtocol.h"
00043 #include "contacts/ContactManager.h"
00044 #include "contacts/Link.h"
00045
00046 using namespace oasys;
00047 namespace dtn {
00048
00049 struct EthConvergenceLayer::Params EthConvergenceLayer::defaults_;
00050
00051
00052
00053
00054
00055
00056
00057 EthConvergenceLayer::EthConvergenceLayer()
00058 : ConvergenceLayer("EthConvergenceLayer", "eth")
00059 {
00060 defaults_.beacon_interval_ = 1;
00061 }
00062
00066 bool
00067 EthConvergenceLayer::parse_params(Params* params,
00068 int argc, const char* argv[],
00069 const char** invalidp)
00070 {
00071 oasys::OptParser p;
00072
00073 p.addopt(new oasys::UIntOpt("beacon_interval", ¶ms->beacon_interval_));
00074
00075 if (! p.parse(argc, argv, invalidp)) {
00076 return false;
00077 }
00078
00079 return true;
00080 }
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093 bool
00094 EthConvergenceLayer::interface_up(Interface* iface,
00095 int argc, const char* argv[])
00096 {
00097 Params params = EthConvergenceLayer::defaults_;
00098 const char *invalid;
00099 if (!parse_params(¶ms, argc, argv, &invalid)) {
00100 log_err("error parsing interface options: invalid option '%s'",
00101 invalid);
00102 return false;
00103 }
00104
00105
00106
00107
00108
00109 const char* if_name=iface->name().c_str()+strlen("string://");
00110 log_info("EthConvergenceLayer::interface_up(%s).", if_name);
00111
00112 Receiver* receiver = new Receiver(if_name, ¶ms);
00113 receiver->logpathf("/cl/eth");
00114 receiver->start();
00115 iface->set_cl_info(receiver);
00116
00117
00118 if_beacon_ = new Beacon(if_name, params.beacon_interval_);
00119 if_beacon_->logpathf("/cl/eth");
00120 if_beacon_->start();
00121
00122 return true;
00123 }
00124
00125 bool
00126 EthConvergenceLayer::interface_down(Interface* iface)
00127 {
00128
00129
00130
00131
00132
00133 if_beacon_->set_should_stop();
00134 while (! if_beacon_->is_stopped()) {
00135 oasys::Thread::yield();
00136 }
00137 delete if_beacon_;
00138
00139 Receiver *receiver = (Receiver *)iface->cl_info();
00140 receiver->set_should_stop();
00141
00142 while (! receiver->is_stopped()) {
00143 oasys::Thread::yield();
00144 }
00145 delete receiver;
00146
00147 return true;
00148 }
00149
00150 bool
00151 EthConvergenceLayer::open_contact(const ContactRef& contact)
00152 {
00153 eth_addr_t addr;
00154
00155 Link* link = contact->link();
00156 log_debug("opening contact to link *%p", link);
00157
00158
00159 if (! EthernetScheme::parse(link->nexthop(), &addr)) {
00160 log_err("next hop address '%s' not a valid eth uri",
00161 link->nexthop());
00162 return false;
00163 }
00164
00165
00166 Sender* sender = new Sender(((EthCLInfo*)link->cl_info())->if_name_,
00167 link->contact());
00168 contact->set_cl_info(sender);
00169
00170 sender->logpathf("/cl/eth");
00171
00172 BundleDaemon::post(new ContactUpEvent(contact));
00173 return true;
00174 }
00175
00176 bool
00177 EthConvergenceLayer::close_contact(const ContactRef& contact)
00178 {
00179 Sender* sender = (Sender*)contact->cl_info();
00180
00181 log_info("close_contact *%p", contact.object());
00182
00183 if (sender) {
00184 contact->set_cl_info(NULL);
00185 delete sender;
00186 }
00187
00188 return true;
00189 }
00190
00194 void
00195 EthConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00196 {
00197 Sender* sender = (Sender*)contact->cl_info();
00198 if (!sender) {
00199 log_crit("send_bundles called on contact *%p with no Sender!!",
00200 contact.object());
00201 return;
00202 }
00203 ASSERT(contact == sender->contact_);
00204
00205 sender->send_bundle(bundle);
00206 }
00207
00208
00209
00210
00211
00212
00213
00214 EthConvergenceLayer::Receiver::Receiver(const char* if_name,
00215 EthConvergenceLayer::Params* params)
00216 : Logger("EthConvergenceLayer::Receiver", "/dtn/cl/eth/receiver"),
00217 Thread("EthConvergenceLayer::Receiver")
00218 {
00219 memset(if_name_,0, IFNAMSIZ);
00220 strcpy(if_name_,if_name);
00221 Thread::flags_ |= INTERRUPTABLE;
00222 (void)params;
00223 }
00224
00225 void
00226 EthConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00227 {
00228 Bundle* bundle = NULL;
00229 EthCLHeader ethclhdr;
00230 size_t bundle_len;
00231 struct ether_header* ethhdr=(struct ether_header*)bp;
00232
00233 log_debug("Received DTN packet on interface %s, %zu.",if_name_, len);
00234
00235
00236 if (len < sizeof(EthCLHeader)) {
00237 log_err("process_data: "
00238 "incoming packet too small (len = %zu)", len);
00239 return;
00240 }
00241 memcpy(ðclhdr, bp+sizeof(struct ether_header), sizeof(EthCLHeader));
00242
00243
00244 if (ethclhdr.version != ETHCL_VERSION) {
00245 log_warn("remote sent version %d, expected version %d "
00246 "-- disconnecting.", ethclhdr.version, ETHCL_VERSION);
00247 return;
00248 }
00249
00250 if(ethclhdr.type == ETHCL_BEACON) {
00251 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00252
00253 char bundles_string[60];
00254 memset(bundles_string,0,60);
00255 EthernetScheme::to_string(ðhdr->ether_shost[0],
00256 bundles_string);
00257 char next_hop_string[50], *ptr;
00258 memset(next_hop_string,0,50);
00259 ptr = strrchr(bundles_string, '/');
00260 strcpy(next_hop_string, ptr+1);
00261
00262 ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00263 EndpointID remote_eid(bundles_string);
00264
00265 Link* link=cm->find_link_to(cl,
00266 next_hop_string,
00267 remote_eid,
00268 Link::OPPORTUNISTIC);
00269
00270 if(!link)
00271 {
00272 log_info("Discovered next_hop %s on interface %s.",
00273 next_hop_string, if_name_);
00274
00275
00276 link=cm->new_opportunistic_link(
00277 cl,
00278 next_hop_string,
00279 EndpointID(bundles_string));
00280
00281
00282 if (link->cl_info() == NULL) {
00283 link->set_cl_info(new EthCLInfo(if_name_));
00284 } else {
00285 ASSERT(strcmp(((EthCLInfo*)link->cl_info())->if_name_,
00286 if_name_) == 0);
00287 }
00288 }
00289
00290 if(!link->isavailable())
00291 {
00292 log_info("Got beacon for previously unavailable link");
00293
00294
00295 log_err("XXx/demmer do something about link availability");
00296 }
00297
00303 BeaconTimer *timer = ((EthCLInfo*)link->cl_info())->timer;
00304 if (timer)
00305 timer->cancel();
00306
00307 timer = new BeaconTimer(next_hop_string);
00308 timer->schedule_in(ETHCL_BEACON_TIMEOUT_INTERVAL);
00309
00310 ((EthCLInfo*)link->cl_info())->timer=timer;
00311 }
00312 else if(ethclhdr.type == ETHCL_BUNDLE) {
00313
00314
00315 bundle_len = len - sizeof(EthCLHeader) - sizeof(struct ether_header);
00316
00317 log_debug("process_data: got ethcl header -- bundle id %d, length %zu",
00318 ntohl(ethclhdr.bundle_id), bundle_len);
00319
00320
00321 bp += (sizeof(EthCLHeader) + sizeof(struct ether_header));
00322 len -= (sizeof(EthCLHeader) + sizeof(struct ether_header));
00323
00324 bundle = new Bundle();
00325 bool complete = false;
00326 int cc = BundleProtocol::consume(bundle, bp, len, &complete);
00327
00328 if (cc < 0) {
00329 log_err("process_data: bundle protocol error");
00330 delete bundle;
00331 return;
00332 }
00333
00334 if (!complete) {
00335 log_err("process_data: incomplete bundle");
00336 delete bundle;
00337 return;
00338 }
00339
00340 log_debug("process_data: new bundle id %d arrival, bundle length %zu",
00341 bundle->bundleid_, bundle_len);
00342
00343 BundleDaemon::post(
00344 new BundleReceivedEvent(bundle, EVENTSRC_PEER, bundle_len));
00345 }
00346 }
00347
00348 void
00349 EthConvergenceLayer::Receiver::run()
00350 {
00351 int sock;
00352 int cc;
00353 struct sockaddr_ll iface;
00354 unsigned char buffer[MAX_ETHER_PACKET];
00355
00356 if((sock = socket(PF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00357 perror("socket");
00358 log_err("EthConvergenceLayer::Receiver::run() "
00359 "Couldn't open socket.");
00360 exit(1);
00361 }
00362
00363
00364 struct ifreq req;
00365 strcpy(req.ifr_name, if_name_);
00366 ioctl(sock, SIOCGIFINDEX, &req);
00367
00368 memset(&iface, 0, sizeof(iface));
00369 iface.sll_family=AF_PACKET;
00370 iface.sll_protocol=htons(ETHERTYPE_DTN);
00371 iface.sll_ifindex=req.ifr_ifindex;
00372
00373 if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00374 perror("bind");
00375 exit(1);
00376 }
00377
00378 log_warn("Reading from socket...");
00379 while(true) {
00380 cc=read (sock, buffer, MAX_ETHER_PACKET);
00381 if(cc<=0) {
00382 perror("EthConvergenceLayer::Receiver::run()");
00383 exit(1);
00384 }
00385 struct ether_header* hdr=(struct ether_header*)buffer;
00386
00387 if(ntohs(hdr->ether_type)==ETHERTYPE_DTN) {
00388 process_data(buffer, cc);
00389 }
00390 else if(ntohs(hdr->ether_type)!=0x800)
00391 {
00392 log_err("Got non-DTN packet in Receiver, type %4X.",
00393 ntohs(hdr->ether_type));
00394
00395 }
00396
00397 if(should_stop())
00398 break;
00399 }
00400 }
00401
00402
00403
00404
00405
00406
00407
00411 EthConvergenceLayer::Sender::Sender(char* if_name,
00412 const ContactRef& contact)
00413 : Logger("EthConvergenceLayer::Sender", "/dtn/cl/eth/sender"),
00414 contact_(contact.object(), "EthConvergenceLayer::Sender")
00415 {
00416 struct ifreq req;
00417 struct sockaddr_ll iface;
00418 Link *link = contact->link();
00419
00420 memset(src_hw_addr_.octet, 0, 6);
00421 EthernetScheme::parse(link->nexthop(), &dst_hw_addr_);
00422
00423 strcpy(if_name_, if_name);
00424 sock_ = 0;
00425
00426 memset(&req, 0, sizeof(req));
00427 memset(&iface, 0, sizeof(iface));
00428
00429
00430
00431
00432 if((sock_ = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00433 perror("socket");
00434 exit(1);
00435 }
00436
00437
00438 strcpy(req.ifr_name, if_name_);
00439
00440
00441 ioctl(sock_, SIOCGIFINDEX, &req);
00442
00443 iface.sll_family=AF_PACKET;
00444 iface.sll_protocol=htons(ETHERTYPE_DTN);
00445 iface.sll_ifindex=req.ifr_ifindex;
00446
00447
00448 if(ioctl(sock_, SIOCGIFHWADDR, &req))
00449 {
00450 perror("ioctl");
00451 exit(1);
00452 }
00453 memcpy(src_hw_addr_.octet,req.ifr_hwaddr.sa_data,6);
00454
00455 if (bind(sock_, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00456 perror("bind");
00457 exit(1);
00458 }
00459 }
00460
00461
00462
00463
00464 bool
00465 EthConvergenceLayer::Sender::send_bundle(Bundle* bundle)
00466 {
00467 int cc;
00468 struct iovec iov[3];
00469
00470 EthCLHeader ethclhdr;
00471 struct ether_header hdr;
00472
00473 memset(iov,0,sizeof(iov));
00474
00475
00476
00477 iov[0].iov_base = (char*)&hdr;
00478 iov[0].iov_len = sizeof(struct ether_header);
00479
00480
00481
00482 memcpy(hdr.ether_dhost,dst_hw_addr_.octet,6);
00483 memcpy(hdr.ether_shost,src_hw_addr_.octet,6);
00484 hdr.ether_type=htons(ETHERTYPE_DTN);
00485
00486
00487
00488 iov[1].iov_base = (char*)ðclhdr;
00489 iov[1].iov_len = sizeof(EthCLHeader);
00490
00491
00492
00493 ethclhdr.version = ETHCL_VERSION;
00494 ethclhdr.type = ETHCL_BUNDLE;
00495 ethclhdr.bundle_id = htonl(bundle->bundleid_);
00496
00497
00498 BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(contact_->link());
00499 ASSERT(blocks != NULL);
00500
00501 bool complete = false;
00502 size_t total_len = BundleProtocol::produce(bundle, blocks,
00503 buf_, 0, sizeof(buf_),
00504 &complete);
00505 if (!complete) {
00506 size_t formatted_len = BundleProtocol::total_length(blocks);
00507 log_err("send_bundle: bundle too big (%zu > %u)",
00508 formatted_len, MAX_ETHER_PACKET);
00509 return -1;
00510 }
00511
00512 iov[2].iov_base = (char *)buf_;
00513 iov[2].iov_len = total_len;
00514
00515
00516
00517 log_info("Sending bundle out interface %s",if_name_);
00518
00519 cc=IO::writevall(sock_, iov, 3);
00520 if(cc<0) {
00521 perror("send");
00522 log_err("Send failed!\n");
00523 }
00524 log_info("Sent packet, size: %d",cc );
00525
00526
00527 bool ok;
00528 int total = sizeof(EthCLHeader) + sizeof(struct ether_header) + total_len;
00529 if (cc != total) {
00530 BundleDaemon::post(new BundleTransmitFailedEvent(bundle,
00531 contact_,
00532 contact_->link()));
00533 log_err("send_bundle: error writing bundle (wrote %d/%d): %s",
00534 cc, total, strerror(errno));
00535 ok = false;
00536 } else {
00537
00538
00539
00540 BundleDaemon::post(
00541 new BundleTransmittedEvent(bundle, contact_,contact_->link(),
00542 total_len, false));
00543 ok = true;
00544 }
00545
00546 return ok;
00547 }
00548
00549 EthConvergenceLayer::Beacon::Beacon(const char* if_name,
00550 unsigned int beacon_interval)
00551 : Logger("EthConvergenceLayer::Beacon", "/dtn/cl/eth/beacon"),
00552 Thread("EthConvergenceLayer::Beacon")
00553 {
00554 Thread::flags_ |= INTERRUPTABLE;
00555 memset(if_name_, 0, IFNAMSIZ);
00556 strcpy(if_name_, if_name);
00557 beacon_interval_ = beacon_interval;
00558 }
00559
00560 void EthConvergenceLayer::Beacon::run()
00561 {
00562
00563 char bcast_mac_addr[6]={0xff,0xff,0xff,0xff,0xff,0xff};
00564
00565 struct ether_header hdr;
00566 struct sockaddr_ll iface;
00567 EthCLHeader ethclhdr;
00568
00569 int sock,cc;
00570 struct iovec iov[2];
00571
00572 memset(&hdr,0,sizeof(hdr));
00573 memset(ðclhdr,0,sizeof(ethclhdr));
00574 memset(&iface,0,sizeof(iface));
00575
00576 ethclhdr.version = ETHCL_VERSION;
00577 ethclhdr.type = ETHCL_BEACON;
00578
00579 hdr.ether_type=htons(ETHERTYPE_DTN);
00580
00581
00582 iov[0].iov_base = (char*)&hdr;
00583 iov[0].iov_len = sizeof(struct ether_header);
00584
00585
00586 iov[1].iov_base = (char*)ðclhdr;
00587 iov[1].iov_len = sizeof(EthCLHeader);
00588
00589
00590
00591
00592 if((sock = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) {
00593 perror("socket");
00594 exit(1);
00595 }
00596
00597 struct ifreq req;
00598 strcpy(req.ifr_name, if_name_);
00599 if(ioctl(sock, SIOCGIFINDEX, &req))
00600 {
00601 perror("ioctl");
00602 exit(1);
00603 }
00604
00605 iface.sll_ifindex=req.ifr_ifindex;
00606
00607 if(ioctl(sock, SIOCGIFHWADDR, &req))
00608 {
00609 perror("ioctl");
00610 exit(1);
00611 }
00612
00613 memcpy(hdr.ether_dhost,bcast_mac_addr,6);
00614 memcpy(hdr.ether_shost,req.ifr_hwaddr.sa_data,6);
00615
00616 log_info("Interface %s has interface number %d.",if_name_,req.ifr_ifindex);
00617
00618 iface.sll_family=AF_PACKET;
00619 iface.sll_protocol=htons(ETHERTYPE_DTN);
00620
00621 if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00622 perror("bind");
00623 exit(1);
00624 }
00625
00626
00627
00628
00629 while(1) {
00630 sleep(beacon_interval_);
00631
00632 if (should_stop())
00633 break;
00634
00635 log_debug("Sent beacon out interface %s.\n",if_name_ );
00636
00637 cc=IO::writevall(sock, iov, 2);
00638 if(cc<0) {
00639 perror("send beacon");
00640 log_err("Send beacon failed!\n");
00641 }
00642 }
00643 }
00644
00645 EthConvergenceLayer::BeaconTimer::BeaconTimer(char * next_hop)
00646 : Logger("EthConvergenceLayer::BeaconTimer", "/dtn/cl/eth/beacontimer")
00647 {
00648 next_hop_=(char*)malloc(strlen(next_hop)+1);
00649 strcpy(next_hop_, next_hop);
00650 }
00651
00652 EthConvergenceLayer::BeaconTimer::~BeaconTimer()
00653 {
00654 free(next_hop_);
00655 }
00656
00657 void
00658 EthConvergenceLayer::BeaconTimer::timeout(const struct timeval& now)
00659 {
00660 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00661 ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00662 Link * l = cm->find_link_to(cl, next_hop_);
00663
00664 (void)now;
00665
00666 log_info("Neighbor %s timer expired.",next_hop_);
00667
00668 if(l == 0) {
00669 log_warn("No link for next_hop %s.",next_hop_);
00670 }
00671 else if(l->isopen()) {
00672 BundleDaemon::post(
00673 new LinkStateChangeRequest(l, Link::CLOSED,
00674 ContactDownEvent::BROKEN));
00675 }
00676 else {
00677 log_warn("next_hop %s unexpectedly not open",next_hop_);
00678 }
00679 }
00680
00681 Timer *
00682 EthConvergenceLayer::BeaconTimer::copy()
00683 {
00684 return new BeaconTimer(*this);
00685 }
00686
00687 }
00688
00689 #endif // __linux