FileConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2004 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <sys/types.h>
00040 #include <sys/stat.h>
00041 #include <dirent.h>
00042 #include <errno.h>
00043 #include <fcntl.h>
00044 #include <unistd.h>
00045 #include <netinet/in.h>
00046 
00047 #include <oasys/io/IO.h>
00048 #include <oasys/util/StringBuffer.h>
00049 #include <oasys/util/URL.h>
00050 
00051 #include "FileConvergenceLayer.h"
00052 #include "bundling/Bundle.h"
00053 #include "bundling/BundleEvent.h"
00054 #include "bundling/BundleList.h"
00055 #include "bundling/BundleProtocol.h"
00056 #include "bundling/BundleDaemon.h"
00057 
00058 namespace dtn {
00059 
00060 /******************************************************************************
00061  *
00062  * FileConvergenceLayer
00063  *
00064  *****************************************************************************/
00065 FileConvergenceLayer::FileConvergenceLayer()
00066     : ConvergenceLayer("FileConvergenceLayer", "file")
00067 {
00068 }
00069 
00073 bool
00074 FileConvergenceLayer::extract_dir(const char* nexthop, std::string* dirp)
00075 {
00076 
00077     PANIC("XXX/demmer fix this implementation");
00078     
00079     oasys::URL url(nexthop);
00080     
00081     if (! url.valid()) {
00082         log_err("extract_dir: next hop ssp '%s' not a valid url", nexthop);
00083         return false;
00084     }
00085 
00086     // the ssp part of the URL should be of the form:
00087     // /path1/path2
00088 
00089     // validate that the "host" part of the url is empty, i.e. that
00090     // the filesystem path is absolute
00091     if (url.host_.length() != 0) {
00092         log_err("interface eid '%s' specifies a non-absolute path",
00093                 nexthop);
00094         return false;
00095     }
00096 
00097     // and make sure there wasn't a port that was parsed out
00098     if (url.port_ != 0) {
00099         log_err("interface eid '%s' specifies a port", nexthop);
00100         return false;
00101     }
00102 
00103     dirp->assign("/");
00104     dirp->append(url.path_);
00105     return true;
00106 }
00107 
00112 bool
00113 FileConvergenceLayer::validate_dir(const std::string& dir)
00114 {
00115     struct stat st;
00116     if (stat(dir.c_str(), &st) != 0) {
00117         log_err("error running stat on %s: %s", dir.c_str(), strerror(errno));
00118         return false;
00119     }
00120 
00121     if (!S_ISDIR(st.st_mode)) {
00122         log_err("error: %s not a directory", dir.c_str());
00123         return false;
00124     }
00125 
00126     // XXX/demmer check permissions
00127 
00128     return true;
00129 }
00130 
00134 bool
00135 FileConvergenceLayer::interface_up(Interface* iface,
00136                                    int argc, const char* argv[])
00137 {
00138     (void)iface;
00139     (void)argc;
00140     (void)argv;
00141     
00142     NOTIMPLEMENTED;
00143     
00144 //     // parse out the directory from the interface
00145 //     std::string dir;
00146 //     if (!extract_dir(iface->eid().c_str(), &dir)) {
00147 //         return false;
00148 //     }
00149     
00150 //     // make sure the directory exists and is readable / executable
00151 //     if (!validate_dir(dir)) {
00152 //         return false;
00153 //     }
00154     
00155 //     // XXX/demmer parse argv for frequency
00156 //     int secs_per_scan = 5;
00157 
00158 //     // create a new thread to scan for new bundle files
00159 //     Scanner* scanner = new Scanner(secs_per_scan, dir);
00160 //     scanner->start();
00161 
00162 //     // store the new scanner in the cl specific part of the interface
00163 //     iface->set_cl_info(scanner);
00164 
00165     
00166     return true;
00167 }
00168 
00172 bool
00173 FileConvergenceLayer::interface_down(Interface* iface)
00174 {
00175     CLInfo *cli = iface->cl_info();
00176     Scanner *scanner = (Scanner *)cli;
00177     scanner->stop();
00178 
00179     // We cannot "delete scanner;" because it is still running
00180     // right now. oasys::Thread::thread_run deletes the Scanner object
00181     // when Scanner::run() returns 
00182 
00183     return true;
00184 }
00185  
00189 bool
00190 FileConvergenceLayer::open_contact(const ContactRef& contact)
00191 {
00192     (void)contact;
00193     // XXX/demmer fixme
00194     
00195     // parse out the directory from the contact
00196 //     std::string dir;
00197 //     if (!extract_dir(contact->nexthop(), &dir)) {
00198 //         return false;
00199 //     }
00200     
00201 //     // make sure the directory exists and is readable / executable
00202 //     if (!validate_dir(dir)) {
00203 //         return false;
00204 //     }
00205 
00206     return true;
00207 }
00208 
00212 bool
00213 FileConvergenceLayer::close_contact(const ContactRef& contact)
00214 {
00215     (void)contact;
00216     // nothing to do
00217     return true;
00218 }
00219     
00223 void
00224 FileConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00225 {
00226     (void)contact;
00227     (void)bundle;
00228     
00229     // XXX/demmer fix this at some point
00230     NOTIMPLEMENTED;
00231 
00232 #ifdef notimplemented
00233     std::string dir;
00234     if (!extract_dir(contact->nexthop(), &dir)) {
00235         PANIC("contact should have already been validated");
00236     }
00237 
00238     FileHeader filehdr;
00239     int iovcnt = BundleProtocol::MAX_IOVCNT + 2;
00240     struct iovec iov[iovcnt];
00241 
00242     filehdr.version = CURRENT_VERSION;
00243     
00244     oasys::StringBuffer fname("%s/bundle-XXXXXX", dir.c_str());
00245     
00246     iov[0].iov_base = (char*)&filehdr;
00247     iov[0].iov_len  = sizeof(FileHeader);
00248 
00249     // fill in the bundle header portion
00250     u_int16_t header_len =
00251         BundleProtocol::format_header_blocks(bundle, &iov[1], &iovcnt);
00252 
00253     // fill in the file header
00254     size_t payload_len = bundle->payload_.length();
00255     filehdr.header_length = htons(header_len);
00256     filehdr.bundle_length = htonl(header_len + payload_len);
00257 
00258     // and tack on the payload (adding one to iovcnt for the
00259     // FileHeader, then one for the payload)
00260     iovcnt++;
00261     PANIC("XXX/demmer fix me");
00262     //iov[iovcnt].iov_base = (void*)bundle->payload_.data();
00263     iov[iovcnt].iov_len  = payload_len;
00264     iovcnt++;
00265 
00266     // open the bundle file 
00267     int fd = mkstemp(fname.c_str());
00268     if (fd == -1) {
00269         log_err("error opening temp file in %s: %s",
00270                 fname.c_str(), strerror(errno));
00271         // XXX/demmer report error here?
00272         return;
00273     }
00274 
00275     log_debug("opened temp file %s for bundle id %d "
00276               "fd %d header_length %zu payload_length %zu",
00277               fname.c_str(), bundle->bundleid_, fd,
00278               header_len, payload_len);
00279 
00280     // now write everything out
00281     int total = sizeof(FileHeader) + header_len + payload_len;
00282     int cc = oasys::IO::writevall(fd, iov, iovcnt, logpath_);
00283     if (cc != total) {
00284         log_err("error writing out bundle (wrote %d/%d): %s",
00285                 cc, total, strerror(errno));
00286     }
00287 
00288     // free up the iovec data
00289     BundleProtocol::free_header_iovmem(bundle, &iov[1], iovcnt - 2);
00290         
00291     // close the file descriptor
00292     close(fd);
00293 
00294     // cons up a transmission event and pass it to the router
00295     bool acked = false;
00296     BundleDaemon::post(
00297         new BundleTransmittedEvent(bundle, contact, payload_len, acked));
00298         
00299     log_debug("bundle id %d successfully transmitted", bundle->bundleid_);
00300 #endif // notimplemented
00301 }
00302 
00303 /******************************************************************************
00304  *
00305  * FileConvergenceLayer::Scanner
00306  *
00307  *****************************************************************************/
00308 FileConvergenceLayer::Scanner::Scanner(int secs_per_scan, 
00309                                        const std::string& dir)
00310     : Logger("FileConvergenceLayer::Scanner",
00311              "/dtn/cl/file/scanner"), 
00312       Thread("FileConvergenceLayer::Scanner"), 
00313       secs_per_scan_(secs_per_scan), 
00314       dir_(dir), 
00315       run_(true)
00316 {
00317     set_flag(DELETE_ON_EXIT);
00318 }
00319 
00323 void
00324 FileConvergenceLayer::Scanner::run()
00325 {
00326     FileHeader filehdr;
00327     DIR* dir = opendir(dir_.c_str());
00328     struct dirent* dirent;
00329     const char* fname;
00330     u_char* buf;
00331     int fd;
00332 
00333     if (!dir) {
00334         // XXX/demmer signal cl somehow?
00335         log_err("error in opendir");
00336         return;
00337     }
00338     
00339     while (run_) {
00340         seekdir(dir, 0);
00341 
00342         while ((dirent = readdir(dir)) != 0) {
00343             fname = dirent->d_name;
00344 
00345             if ((fname[0] == '.') &&
00346                 ((fname[1] == '\0') ||
00347                  (fname[1] == '.' && fname[2] == '\0')))
00348             {
00349                 continue;
00350             }
00351             
00352             log_debug("scan found file %s", fname);
00353 
00354             // cons up the full path
00355             oasys::StringBuffer path("%s/%s", dir_.c_str(), fname);
00356 
00357             // malloc a buffer for it, open a file descriptor, and
00358             // read in the header
00359             if ((fd = open(path.c_str(), 0)) == -1) {
00360                 log_err("error opening file %s: %s", path.c_str(), strerror(errno));
00361                 continue;
00362             }
00363 
00364             int cc = oasys::IO::readall(fd, (char*)&filehdr, sizeof(FileHeader));
00365             if (cc != sizeof(FileHeader)) {
00366                 log_warn("can't read in FileHeader (read %d/%zu): %s",
00367                          cc, sizeof(FileHeader), strerror(errno));
00368                 continue;
00369             }
00370 
00371             if (filehdr.version != CURRENT_VERSION) {
00372                 log_warn("framing protocol version mismatch: %d != current %d",
00373                          filehdr.version, CURRENT_VERSION);
00374                 continue;
00375             }
00376 
00377             u_int16_t header_len = ntohs(filehdr.header_length);
00378             size_t bundle_len = ntohl(filehdr.bundle_length);
00379             
00380             log_debug("found bundle file %s: header_length %u bundle_length %zu",
00381                       path.c_str(), header_len, bundle_len);
00382 
00383             // read in and parse the headers
00384             buf = (u_char*)malloc(header_len);
00385             cc = oasys::IO::readall(fd, (char*)buf, header_len);
00386             if (cc != header_len) {
00387                 log_err("error reading file %s header (read %d/%d): %s",
00388                         path.c_str(), cc, header_len, strerror(errno));
00389                 free(buf);
00390                 continue;
00391             }
00392 
00393             Bundle* bundle = new Bundle();
00394             if (! BundleProtocol::parse_header_blocks(bundle, buf, header_len)) {
00395                 log_err("error parsing bundle headers in file %s", path.c_str());
00396                 free(buf);
00397                 delete bundle;
00398                 continue;
00399             }
00400             free(buf);
00401 
00402             // Now validate the lengths
00403             size_t payload_len = bundle->payload_.length();
00404             if (bundle_len != header_len + payload_len) {
00405                 log_err("error in bundle lengths in file %s: "
00406                         "bundle_length %zu, header_length %u, payload_length %zu",
00407                         path.c_str(), bundle_len, header_len, payload_len);
00408                 delete bundle;
00409                 continue;
00410             }
00411 
00412             // Looks good, now read in and assign the data
00413             buf = (u_char*)malloc(payload_len);
00414             cc = oasys::IO::readall(fd, (char*)buf, payload_len);
00415             if (cc != (int)payload_len) {
00416                 log_err("error reading file %s payload (read %d/%zu): %s",
00417                         path.c_str(), cc, payload_len, strerror(errno));
00418                 delete bundle;
00419                 continue;
00420             }
00421             bundle->payload_.set_data(buf, payload_len);
00422             free(buf);
00423             
00424             // close the file descriptor and remove the file
00425             if (close(fd) != 0) {
00426                 log_err("error closing file %s: %s",
00427                         path.c_str(), strerror(errno));
00428             }
00429             
00430             if (unlink(path.c_str()) != 0) {
00431                 log_err("error removing file %s: %s",
00432                         path.c_str(), strerror(errno));
00433             }
00434 
00435             // all set, notify the router
00436             BundleDaemon::post(
00437                 new BundleReceivedEvent(bundle, EVENTSRC_PEER));
00438         }
00439             
00440         sleep(secs_per_scan_);
00441     }
00442     log_info("exiting");
00443 }
00444 
00448 void FileConvergenceLayer::Scanner::stop() {
00449     run_ = false;
00450 }
00451 
00452 } // namespace dtn

Generated on Fri Dec 22 14:47:59 2006 for DTN Reference Implementation by  doxygen 1.5.1