EthConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 // Only works on Linux (for now)
00019 #ifdef __linux__
00020 
00021 #include <sys/poll.h>
00022 #include <stdlib.h>
00023 #include <sys/types.h>
00024 #include <sys/socket.h>
00025 #include <netinet/in.h>
00026 #include <net/ethernet.h>
00027 #include <netpacket/packet.h>
00028 #include <sys/ioctl.h>
00029 
00030 #include <oasys/io/NetUtils.h>
00031 #include <oasys/io/IO.h>
00032 #include <oasys/thread/Timer.h>
00033 #include <oasys/util/OptParser.h>
00034 #include <oasys/util/URL.h>
00035 #include <oasys/util/StringBuffer.h>
00036 
00037 #include "EthConvergenceLayer.h"
00038 #include "bundling/Bundle.h"
00039 #include "bundling/BundleEvent.h"
00040 #include "bundling/BundleDaemon.h"
00041 #include "bundling/BundleList.h"
00042 #include "bundling/BundleProtocol.h"
00043 #include "contacts/ContactManager.h"
00044 #include "contacts/Link.h"
00045 
00046 using namespace oasys;
00047 namespace dtn {
00048 
00049 struct EthConvergenceLayer::Params EthConvergenceLayer::defaults_;
00050 
00051 /******************************************************************************
00052  *
00053  * EthConvergenceLayer
00054  *
00055  *****************************************************************************/
00056 
00057 EthConvergenceLayer::EthConvergenceLayer()
00058     : ConvergenceLayer("EthConvergenceLayer", "eth")
00059 {
00060     defaults_.beacon_interval_          = 1;
00061 }
00062 
00066 bool
00067 EthConvergenceLayer::parse_params(Params* params,
00068                                   int argc, const char* argv[],
00069                                   const char** invalidp)
00070 {
00071     oasys::OptParser p;
00072 
00073     p.addopt(new oasys::UIntOpt("beacon_interval", &params->beacon_interval_));
00074 
00075     if (! p.parse(argc, argv, invalidp)) {
00076         return false;
00077     }
00078 
00079     return true;
00080 }
00081 
00082 /* 
00083  *   Start listening to, and sending beacons on, the provided interface.
00084  *
00085  *   For now, we support interface strings on the form
00086  *   string://eth0
00087  *   
00088  *   this should change further down the line to simply be
00089  *    eth0
00090  *  
00091  */
00092 
00093 bool
00094 EthConvergenceLayer::interface_up(Interface* iface,
00095                                   int argc, const char* argv[])
00096 {
00097     Params params = EthConvergenceLayer::defaults_;
00098     const char *invalid;
00099     if (!parse_params(&params, argc, argv, &invalid)) {
00100         log_err("error parsing interface options: invalid option '%s'",
00101                 invalid);
00102         return false;
00103     }
00104 
00105     // grab the interface name out of the string:// 
00106 
00107     // XXX/jakob - this fugly mess needs to change when we get the
00108     // config stuff right
00109     const char* if_name=iface->name().c_str()+strlen("string://");
00110     log_info("EthConvergenceLayer::interface_up(%s).", if_name);
00111     
00112     Receiver* receiver = new Receiver(if_name, &params);
00113     receiver->logpathf("/cl/eth");
00114     receiver->start();
00115     iface->set_cl_info(receiver);
00116 
00117     // remembers the interface beacon object
00118     if_beacon_ = new Beacon(if_name, params.beacon_interval_);
00119     if_beacon_->logpathf("/cl/eth");
00120     if_beacon_->start();
00121     
00122     return true;
00123 }
00124 
00125 bool
00126 EthConvergenceLayer::interface_down(Interface* iface)
00127 {
00128   // XXX/jakob - need to keep track of the Beacon and Receiver threads for each 
00129   //             interface and kill them.
00130   // NOTIMPLEMENTED;
00131 
00132     // xxx/shawn needs to find a way to delete beacon;
00133     if_beacon_->set_should_stop();
00134     while (! if_beacon_->is_stopped()) {
00135         oasys::Thread::yield();
00136     }
00137     delete if_beacon_;
00138 
00139     Receiver *receiver = (Receiver *)iface->cl_info();
00140     receiver->set_should_stop();
00141     // receiver->interrupt_from_io();
00142     while (! receiver->is_stopped()) {
00143         oasys::Thread::yield();
00144     }
00145     delete receiver;
00146 
00147     return true;
00148 }
00149 
00150 bool
00151 EthConvergenceLayer::open_contact(const ContactRef& contact)
00152 {
00153     eth_addr_t addr;
00154 
00155     Link* link = contact->link();
00156     log_debug("opening contact to link *%p", link);
00157 
00158     // parse out the address from the contact nexthop
00159     if (! EthernetScheme::parse(link->nexthop(), &addr)) {
00160         log_err("next hop address '%s' not a valid eth uri",
00161                 link->nexthop());
00162         return false;
00163     }
00164     
00165     // create a new connection for the contact
00166     Sender* sender = new Sender(((EthCLInfo*)link->cl_info())->if_name_,
00167                                 link->contact());
00168     contact->set_cl_info(sender);
00169 
00170     sender->logpathf("/cl/eth");
00171 
00172     BundleDaemon::post(new ContactUpEvent(contact));
00173     return true;
00174 }
00175 
00176 bool
00177 EthConvergenceLayer::close_contact(const ContactRef& contact)
00178 {  
00179     Sender* sender = (Sender*)contact->cl_info();
00180     
00181     log_info("close_contact *%p", contact.object());
00182 
00183     if (sender) {            
00184         contact->set_cl_info(NULL);
00185         delete sender;
00186     }
00187     
00188     return true;
00189 }
00190 
00194 void
00195 EthConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00196 {
00197     Sender* sender = (Sender*)contact->cl_info();
00198     if (!sender) {
00199         log_crit("send_bundles called on contact *%p with no Sender!!",
00200                  contact.object());
00201         return;
00202     }
00203     ASSERT(contact == sender->contact_);
00204     
00205     sender->send_bundle(bundle); // consumes bundle reference
00206 }
00207 
00208 
00209 /******************************************************************************
00210  *
00211  * EthConvergenceLayer::Receiver
00212  *
00213  *****************************************************************************/
00214 EthConvergenceLayer::Receiver::Receiver(const char* if_name,
00215                                         EthConvergenceLayer::Params* params)
00216   : Logger("EthConvergenceLayer::Receiver", "/dtn/cl/eth/receiver"),
00217     Thread("EthConvergenceLayer::Receiver")
00218 {
00219     memset(if_name_,0, IFNAMSIZ);
00220     strcpy(if_name_,if_name);
00221     Thread::flags_ |= INTERRUPTABLE;
00222     (void)params;
00223 }
00224 
00225 void
00226 EthConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
00227 {
00228     Bundle* bundle = NULL;       
00229     EthCLHeader ethclhdr;
00230     size_t bundle_len;
00231     struct ether_header* ethhdr=(struct ether_header*)bp;
00232     
00233     log_debug("Received DTN packet on interface %s, %zu.",if_name_, len);    
00234 
00235     // copy in the ethcl header.
00236     if (len < sizeof(EthCLHeader)) {
00237         log_err("process_data: "
00238                 "incoming packet too small (len = %zu)", len);
00239         return;
00240     }
00241     memcpy(&ethclhdr, bp+sizeof(struct ether_header), sizeof(EthCLHeader));
00242 
00243     // check for valid magic number and version
00244     if (ethclhdr.version != ETHCL_VERSION) {
00245         log_warn("remote sent version %d, expected version %d "
00246                  "-- disconnecting.", ethclhdr.version, ETHCL_VERSION);
00247         return;
00248     }
00249 
00250     if(ethclhdr.type == ETHCL_BEACON) {
00251         ContactManager* cm = BundleDaemon::instance()->contactmgr();
00252 
00253         char bundles_string[60];
00254         memset(bundles_string,0,60);
00255         EthernetScheme::to_string(&ethhdr->ether_shost[0],
00256                                   bundles_string);
00257         char next_hop_string[50], *ptr;
00258         memset(next_hop_string,0,50);
00259         ptr = strrchr(bundles_string, '/');
00260         strcpy(next_hop_string, ptr+1);
00261         
00262         ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00263         EndpointID remote_eid(bundles_string);
00264 
00265         Link* link=cm->find_link_to(cl,
00266                                     next_hop_string,
00267                                     remote_eid,
00268                                     Link::OPPORTUNISTIC);
00269         
00270         if(!link)
00271         {
00272             log_info("Discovered next_hop %s on interface %s.",
00273                      next_hop_string, if_name_);
00274             
00275             // registers a new contact with the routing layer
00276             link=cm->new_opportunistic_link(
00277                 cl,
00278                 next_hop_string,
00279                 EndpointID(bundles_string));
00280             
00281             // XXX/demmer I'm not sure about the following
00282             if (link->cl_info() == NULL) {
00283                 link->set_cl_info(new EthCLInfo(if_name_));
00284             } else {
00285                 ASSERT(strcmp(((EthCLInfo*)link->cl_info())->if_name_,
00286                               if_name_) == 0);
00287             }
00288         }
00289 
00290         if(!link->isavailable())
00291         {
00292             log_info("Got beacon for previously unavailable link");
00293             
00294             // XXX/demmer something should be done here to kick the link...
00295             log_err("XXx/demmer do something about link availability");
00296         }
00297         
00303         BeaconTimer *timer = ((EthCLInfo*)link->cl_info())->timer;
00304         if (timer)
00305             timer->cancel();
00306 
00307         timer = new BeaconTimer(next_hop_string); 
00308         timer->schedule_in(ETHCL_BEACON_TIMEOUT_INTERVAL);
00309         
00310         ((EthCLInfo*)link->cl_info())->timer=timer;
00311     }
00312     else if(ethclhdr.type == ETHCL_BUNDLE) {
00313         // infer the bundle length based on the packet length minus the
00314         // eth cl header
00315         bundle_len = len - sizeof(EthCLHeader) - sizeof(struct ether_header);
00316         
00317         log_debug("process_data: got ethcl header -- bundle id %d, length %zu",
00318                   ntohl(ethclhdr.bundle_id), bundle_len);
00319         
00320         // skip past the cl header
00321         bp  += (sizeof(EthCLHeader) + sizeof(struct ether_header));
00322         len -= (sizeof(EthCLHeader) + sizeof(struct ether_header));
00323 
00324         bundle = new Bundle();
00325         bool complete = false;
00326         int cc = BundleProtocol::consume(bundle, bp, len, &complete);
00327 
00328         if (cc < 0) {
00329             log_err("process_data: bundle protocol error");
00330             delete bundle;
00331             return;
00332         }
00333 
00334         if (!complete) {
00335             log_err("process_data: incomplete bundle");
00336             delete bundle;
00337             return;
00338         }
00339 
00340         log_debug("process_data: new bundle id %d arrival, bundle length %zu",
00341                   bundle->bundleid_, bundle_len);
00342         
00343         BundleDaemon::post(
00344             new BundleReceivedEvent(bundle, EVENTSRC_PEER, bundle_len));
00345     }
00346 }
00347 
00348 void
00349 EthConvergenceLayer::Receiver::run()
00350 {
00351     int sock;
00352     int cc;
00353     struct sockaddr_ll iface;
00354     unsigned char buffer[MAX_ETHER_PACKET];
00355 
00356     if((sock = socket(PF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) { 
00357         perror("socket");
00358         log_err("EthConvergenceLayer::Receiver::run() " 
00359                 "Couldn't open socket.");       
00360         exit(1);
00361     }
00362    
00363     // figure out the interface index of the device with name if_name_
00364     struct ifreq req;
00365     strcpy(req.ifr_name, if_name_);
00366     ioctl(sock, SIOCGIFINDEX, &req);
00367 
00368     memset(&iface, 0, sizeof(iface));
00369     iface.sll_family=AF_PACKET;
00370     iface.sll_protocol=htons(ETHERTYPE_DTN);
00371     iface.sll_ifindex=req.ifr_ifindex;
00372    
00373     if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00374         perror("bind");
00375         exit(1);
00376     }
00377 
00378     log_warn("Reading from socket...");
00379     while(true) {
00380         cc=read (sock, buffer, MAX_ETHER_PACKET);
00381         if(cc<=0) {
00382             perror("EthConvergenceLayer::Receiver::run()");
00383             exit(1);
00384         }
00385         struct ether_header* hdr=(struct ether_header*)buffer;
00386   
00387         if(ntohs(hdr->ether_type)==ETHERTYPE_DTN) {
00388             process_data(buffer, cc);
00389         }
00390         else if(ntohs(hdr->ether_type)!=0x800)
00391         {
00392             log_err("Got non-DTN packet in Receiver, type %4X.",
00393                     ntohs(hdr->ether_type));
00394             // exit(1);
00395         }
00396 
00397         if(should_stop())
00398             break;
00399     }
00400 }
00401 
00402 /******************************************************************************
00403  *
00404  * EthConvergenceLayer::Sender
00405  *
00406  *****************************************************************************/
00407 
00411 EthConvergenceLayer::Sender::Sender(char* if_name,
00412                                     const ContactRef& contact)
00413     : Logger("EthConvergenceLayer::Sender", "/dtn/cl/eth/sender"),
00414       contact_(contact.object(), "EthConvergenceLayer::Sender")
00415 {
00416     struct ifreq req;
00417     struct sockaddr_ll iface;
00418     Link *link = contact->link();
00419 
00420     memset(src_hw_addr_.octet, 0, 6); // determined in Sender::run()
00421     EthernetScheme::parse(link->nexthop(), &dst_hw_addr_);
00422 
00423     strcpy(if_name_, if_name);
00424     sock_ = 0;
00425 
00426     memset(&req, 0, sizeof(req));
00427     memset(&iface, 0, sizeof(iface));
00428 
00429     // Get and bind a RAW socket for this contact. XXX/jakob - seems
00430     // like it'd be enough with one socket per interface, not one per
00431     // contact. figure this out some time.
00432     if((sock_ = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) { 
00433         perror("socket");
00434         exit(1);
00435     }
00436 
00437     // get the interface name from the contact info
00438     strcpy(req.ifr_name, if_name_);
00439 
00440     // ifreq the interface index for binding the socket    
00441     ioctl(sock_, SIOCGIFINDEX, &req);
00442     
00443     iface.sll_family=AF_PACKET;
00444     iface.sll_protocol=htons(ETHERTYPE_DTN);
00445     iface.sll_ifindex=req.ifr_ifindex;
00446         
00447     // store away the ethernet address of the device in question
00448     if(ioctl(sock_, SIOCGIFHWADDR, &req))
00449     {
00450         perror("ioctl");
00451         exit(1);
00452     } 
00453     memcpy(src_hw_addr_.octet,req.ifr_hwaddr.sa_data,6);    
00454 
00455     if (bind(sock_, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00456         perror("bind");
00457         exit(1);
00458     }
00459 }
00460         
00461 /* 
00462  * Send one bundle.
00463  */
00464 bool 
00465 EthConvergenceLayer::Sender::send_bundle(Bundle* bundle) 
00466 {
00467     int cc;
00468     struct iovec iov[3];
00469         
00470     EthCLHeader ethclhdr;
00471     struct ether_header hdr;
00472 
00473     memset(iov,0,sizeof(iov));
00474     
00475     // iovec slot 0 holds the ethernet header
00476 
00477     iov[0].iov_base = (char*)&hdr;
00478     iov[0].iov_len = sizeof(struct ether_header);
00479 
00480     // write the ethernet header
00481 
00482     memcpy(hdr.ether_dhost,dst_hw_addr_.octet,6);
00483     memcpy(hdr.ether_shost,src_hw_addr_.octet,6); // Sender::hw_addr
00484     hdr.ether_type=htons(ETHERTYPE_DTN);
00485     
00486     // iovec slot 1 for the eth cl header
00487 
00488     iov[1].iov_base = (char*)&ethclhdr;
00489     iov[1].iov_len  = sizeof(EthCLHeader);
00490     
00491     // write the ethcl header
00492 
00493     ethclhdr.version    = ETHCL_VERSION;
00494     ethclhdr.type       = ETHCL_BUNDLE;
00495     ethclhdr.bundle_id  = htonl(bundle->bundleid_);    
00496 
00497     // iovec slot 2 for the bundle
00498     BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(contact_->link());
00499     ASSERT(blocks != NULL);
00500 
00501     bool complete = false;
00502     size_t total_len = BundleProtocol::produce(bundle, blocks,
00503                                                buf_, 0, sizeof(buf_),
00504                                                &complete);
00505     if (!complete) {
00506         size_t formatted_len = BundleProtocol::total_length(blocks);
00507         log_err("send_bundle: bundle too big (%zu > %u)",
00508                 formatted_len, MAX_ETHER_PACKET);
00509         return -1;
00510     }
00511 
00512     iov[2].iov_base = (char *)buf_;
00513     iov[2].iov_len  = total_len;
00514     
00515     // We're done assembling the packet. Now write the whole thing to
00516     // the socket!
00517     log_info("Sending bundle out interface %s",if_name_);
00518 
00519     cc=IO::writevall(sock_, iov, 3);
00520     if(cc<0) {
00521         perror("send");
00522         log_err("Send failed!\n");
00523     }    
00524     log_info("Sent packet, size: %d",cc );
00525     
00526     // check that we successfully wrote it all
00527     bool ok;
00528     int total = sizeof(EthCLHeader) + sizeof(struct ether_header) + total_len;
00529     if (cc != total) {
00530         BundleDaemon::post(new BundleTransmitFailedEvent(bundle,
00531                                                          contact_,
00532                                                          contact_->link()));
00533         log_err("send_bundle: error writing bundle (wrote %d/%d): %s",
00534                 cc, total, strerror(errno));
00535         ok = false;
00536     } else {
00537         // cons up a transmission event and pass it to the router
00538         // since this is an unreliable protocol, acked_len = 0, and
00539         // ack = false
00540         BundleDaemon::post(
00541             new BundleTransmittedEvent(bundle, contact_,contact_->link(),
00542                                        total_len, false));
00543         ok = true;
00544     }
00545 
00546     return ok;
00547 }
00548 
00549 EthConvergenceLayer::Beacon::Beacon(const char* if_name,
00550                                     unsigned int beacon_interval)
00551   : Logger("EthConvergenceLayer::Beacon", "/dtn/cl/eth/beacon"),
00552     Thread("EthConvergenceLayer::Beacon")
00553 {
00554     Thread::flags_ |= INTERRUPTABLE;
00555     memset(if_name_, 0, IFNAMSIZ);
00556     strcpy(if_name_, if_name);
00557     beacon_interval_ = beacon_interval;
00558 }
00559 
00560 void EthConvergenceLayer::Beacon::run()
00561 {
00562     // ethernet broadcast address
00563     char bcast_mac_addr[6]={0xff,0xff,0xff,0xff,0xff,0xff};
00564     
00565     struct ether_header hdr;
00566     struct sockaddr_ll iface;
00567     EthCLHeader ethclhdr;
00568     
00569     int sock,cc;
00570     struct iovec iov[2];
00571     
00572     memset(&hdr,0,sizeof(hdr));
00573     memset(&ethclhdr,0,sizeof(ethclhdr));
00574     memset(&iface,0,sizeof(iface));
00575 
00576     ethclhdr.version = ETHCL_VERSION;
00577     ethclhdr.type    = ETHCL_BEACON;
00578     
00579     hdr.ether_type=htons(ETHERTYPE_DTN);
00580     
00581     // iovec slot 0 holds the ethernet header
00582     iov[0].iov_base = (char*)&hdr;
00583     iov[0].iov_len = sizeof(struct ether_header);
00584     
00585     // use iovec slot 1 for the eth cl header
00586     iov[1].iov_base = (char*)&ethclhdr;
00587     iov[1].iov_len  = sizeof(EthCLHeader); 
00588     
00589     /* 
00590        Get ourselves a raw socket, and configure it.
00591     */
00592     if((sock = socket(AF_PACKET,SOCK_RAW, htons(ETHERTYPE_DTN))) < 0) { 
00593         perror("socket");
00594         exit(1);
00595     }
00596 
00597     struct ifreq req;
00598     strcpy(req.ifr_name, if_name_);
00599     if(ioctl(sock, SIOCGIFINDEX, &req))
00600     {
00601         perror("ioctl");
00602         exit(1);
00603     }    
00604 
00605     iface.sll_ifindex=req.ifr_ifindex;
00606 
00607     if(ioctl(sock, SIOCGIFHWADDR, &req))
00608     {
00609         perror("ioctl");
00610         exit(1);
00611     } 
00612     
00613     memcpy(hdr.ether_dhost,bcast_mac_addr,6);
00614     memcpy(hdr.ether_shost,req.ifr_hwaddr.sa_data,6);
00615     
00616     log_info("Interface %s has interface number %d.",if_name_,req.ifr_ifindex);
00617     
00618     iface.sll_family=AF_PACKET;
00619     iface.sll_protocol=htons(ETHERTYPE_DTN);
00620     
00621     if (bind(sock, (struct sockaddr *) &iface, sizeof(iface)) == -1) {
00622         perror("bind");
00623         exit(1);
00624     }
00625 
00626     /*
00627      * Send the beacon on the socket every beacon_interval_ second.
00628      */
00629     while(1) {
00630         sleep(beacon_interval_);
00631 
00632         if (should_stop())
00633             break;
00634 
00635         log_debug("Sent beacon out interface %s.\n",if_name_ );
00636         
00637         cc=IO::writevall(sock, iov, 2);
00638         if(cc<0) {
00639             perror("send beacon");
00640             log_err("Send beacon failed!\n");
00641         }
00642     }
00643 }
00644 
00645 EthConvergenceLayer::BeaconTimer::BeaconTimer(char * next_hop)
00646     :  Logger("EthConvergenceLayer::BeaconTimer", "/dtn/cl/eth/beacontimer")
00647 {
00648     next_hop_=(char*)malloc(strlen(next_hop)+1);
00649     strcpy(next_hop_, next_hop);
00650 }
00651 
00652 EthConvergenceLayer::BeaconTimer::~BeaconTimer()
00653 {
00654     free(next_hop_);
00655 }
00656 
00657 void
00658 EthConvergenceLayer::BeaconTimer::timeout(const struct timeval& now)
00659 {
00660     ContactManager* cm = BundleDaemon::instance()->contactmgr();
00661     ConvergenceLayer* cl = ConvergenceLayer::find_clayer("eth");
00662     Link * l = cm->find_link_to(cl, next_hop_);
00663 
00664     (void)now;
00665 
00666     log_info("Neighbor %s timer expired.",next_hop_);
00667 
00668     if(l == 0) {
00669       log_warn("No link for next_hop %s.",next_hop_);
00670     }
00671     else if(l->isopen()) {
00672         BundleDaemon::post(
00673             new LinkStateChangeRequest(l, Link::CLOSED,
00674                                        ContactDownEvent::BROKEN));
00675     }
00676     else {
00677         log_warn("next_hop %s unexpectedly not open",next_hop_);
00678     }
00679 }
00680 
00681 Timer *
00682 EthConvergenceLayer::BeaconTimer::copy()
00683 {
00684     return new BeaconTimer(*this);
00685 }
00686 
00687 } // namespace dtn
00688 
00689 #endif // __linux

Generated on Thu Jun 7 12:54:27 2007 for DTN Reference Implementation by  doxygen 1.5.1