00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <sys/types.h>
00040 #include <sys/stat.h>
00041 #include <dirent.h>
00042 #include <errno.h>
00043 #include <fcntl.h>
00044 #include <unistd.h>
00045 #include <netinet/in.h>
00046
00047 #include <oasys/io/IO.h>
00048 #include <oasys/util/StringBuffer.h>
00049 #include <oasys/util/URL.h>
00050
00051 #include "FileConvergenceLayer.h"
00052 #include "bundling/Bundle.h"
00053 #include "bundling/BundleEvent.h"
00054 #include "bundling/BundleList.h"
00055 #include "bundling/BundleProtocol.h"
00056 #include "bundling/BundleDaemon.h"
00057
00058 namespace dtn {
00059
00060
00061
00062
00063
00064
00065 FileConvergenceLayer::FileConvergenceLayer()
00066 : ConvergenceLayer("FileConvergenceLayer", "file")
00067 {
00068 }
00069
00073 bool
00074 FileConvergenceLayer::extract_dir(const char* nexthop, std::string* dirp)
00075 {
00076
00077 PANIC("XXX/demmer fix this implementation");
00078
00079 oasys::URL url(nexthop);
00080
00081 if (! url.valid()) {
00082 log_err("extract_dir: next hop ssp '%s' not a valid url", nexthop);
00083 return false;
00084 }
00085
00086
00087
00088
00089
00090
00091 if (url.host_.length() != 0) {
00092 log_err("interface eid '%s' specifies a non-absolute path",
00093 nexthop);
00094 return false;
00095 }
00096
00097
00098 if (url.port_ != 0) {
00099 log_err("interface eid '%s' specifies a port", nexthop);
00100 return false;
00101 }
00102
00103 dirp->assign("/");
00104 dirp->append(url.path_);
00105 return true;
00106 }
00107
00112 bool
00113 FileConvergenceLayer::validate_dir(const std::string& dir)
00114 {
00115 struct stat st;
00116 if (stat(dir.c_str(), &st) != 0) {
00117 log_err("error running stat on %s: %s", dir.c_str(), strerror(errno));
00118 return false;
00119 }
00120
00121 if (!S_ISDIR(st.st_mode)) {
00122 log_err("error: %s not a directory", dir.c_str());
00123 return false;
00124 }
00125
00126
00127
00128 return true;
00129 }
00130
00134 bool
00135 FileConvergenceLayer::interface_up(Interface* iface,
00136 int argc, const char* argv[])
00137 {
00138 (void)iface;
00139 (void)argc;
00140 (void)argv;
00141
00142 NOTIMPLEMENTED;
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166 return true;
00167 }
00168
00172 bool
00173 FileConvergenceLayer::interface_down(Interface* iface)
00174 {
00175 CLInfo *cli = iface->cl_info();
00176 Scanner *scanner = (Scanner *)cli;
00177 scanner->stop();
00178
00179
00180
00181
00182
00183 return true;
00184 }
00185
00189 bool
00190 FileConvergenceLayer::open_contact(const ContactRef& contact)
00191 {
00192 (void)contact;
00193
00194
00195
00196
00197
00198
00199
00200
00201
00202
00203
00204
00205
00206 return true;
00207 }
00208
00212 bool
00213 FileConvergenceLayer::close_contact(const ContactRef& contact)
00214 {
00215 (void)contact;
00216
00217 return true;
00218 }
00219
00223 void
00224 FileConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
00225 {
00226 (void)contact;
00227 (void)bundle;
00228
00229
00230 NOTIMPLEMENTED;
00231
00232 #ifdef notimplemented
00233 std::string dir;
00234 if (!extract_dir(contact->nexthop(), &dir)) {
00235 PANIC("contact should have already been validated");
00236 }
00237
00238 FileHeader filehdr;
00239 int iovcnt = BundleProtocol::MAX_IOVCNT + 2;
00240 struct iovec iov[iovcnt];
00241
00242 filehdr.version = CURRENT_VERSION;
00243
00244 oasys::StringBuffer fname("%s/bundle-XXXXXX", dir.c_str());
00245
00246 iov[0].iov_base = (char*)&filehdr;
00247 iov[0].iov_len = sizeof(FileHeader);
00248
00249
00250 u_int16_t header_len =
00251 BundleProtocol::format_header_blocks(bundle, &iov[1], &iovcnt);
00252
00253
00254 size_t payload_len = bundle->payload_.length();
00255 filehdr.header_length = htons(header_len);
00256 filehdr.bundle_length = htonl(header_len + payload_len);
00257
00258
00259
00260 iovcnt++;
00261 PANIC("XXX/demmer fix me");
00262
00263 iov[iovcnt].iov_len = payload_len;
00264 iovcnt++;
00265
00266
00267 int fd = mkstemp(fname.c_str());
00268 if (fd == -1) {
00269 log_err("error opening temp file in %s: %s",
00270 fname.c_str(), strerror(errno));
00271
00272 return;
00273 }
00274
00275 log_debug("opened temp file %s for bundle id %d "
00276 "fd %d header_length %zu payload_length %zu",
00277 fname.c_str(), bundle->bundleid_, fd,
00278 header_len, payload_len);
00279
00280
00281 int total = sizeof(FileHeader) + header_len + payload_len;
00282 int cc = oasys::IO::writevall(fd, iov, iovcnt, logpath_);
00283 if (cc != total) {
00284 log_err("error writing out bundle (wrote %d/%d): %s",
00285 cc, total, strerror(errno));
00286 }
00287
00288
00289 BundleProtocol::free_header_iovmem(bundle, &iov[1], iovcnt - 2);
00290
00291
00292 close(fd);
00293
00294
00295 bool acked = false;
00296 BundleDaemon::post(
00297 new BundleTransmittedEvent(bundle, contact, payload_len, acked));
00298
00299 log_debug("bundle id %d successfully transmitted", bundle->bundleid_);
00300 #endif // notimplemented
00301 }
00302
00303
00304
00305
00306
00307
00308 FileConvergenceLayer::Scanner::Scanner(int secs_per_scan,
00309 const std::string& dir)
00310 : Logger("FileConvergenceLayer::Scanner",
00311 "/dtn/cl/file/scanner"),
00312 Thread("FileConvergenceLayer::Scanner"),
00313 secs_per_scan_(secs_per_scan),
00314 dir_(dir),
00315 run_(true)
00316 {
00317 set_flag(DELETE_ON_EXIT);
00318 }
00319
00323 void
00324 FileConvergenceLayer::Scanner::run()
00325 {
00326 FileHeader filehdr;
00327 DIR* dir = opendir(dir_.c_str());
00328 struct dirent* dirent;
00329 const char* fname;
00330 u_char* buf;
00331 int fd;
00332
00333 if (!dir) {
00334
00335 log_err("error in opendir");
00336 return;
00337 }
00338
00339 while (run_) {
00340 seekdir(dir, 0);
00341
00342 while ((dirent = readdir(dir)) != 0) {
00343 fname = dirent->d_name;
00344
00345 if ((fname[0] == '.') &&
00346 ((fname[1] == '\0') ||
00347 (fname[1] == '.' && fname[2] == '\0')))
00348 {
00349 continue;
00350 }
00351
00352 log_debug("scan found file %s", fname);
00353
00354
00355 oasys::StringBuffer path("%s/%s", dir_.c_str(), fname);
00356
00357
00358
00359 if ((fd = open(path.c_str(), 0)) == -1) {
00360 log_err("error opening file %s: %s", path.c_str(), strerror(errno));
00361 continue;
00362 }
00363
00364 int cc = oasys::IO::readall(fd, (char*)&filehdr, sizeof(FileHeader));
00365 if (cc != sizeof(FileHeader)) {
00366 log_warn("can't read in FileHeader (read %d/%zu): %s",
00367 cc, sizeof(FileHeader), strerror(errno));
00368 continue;
00369 }
00370
00371 if (filehdr.version != CURRENT_VERSION) {
00372 log_warn("framing protocol version mismatch: %d != current %d",
00373 filehdr.version, CURRENT_VERSION);
00374 continue;
00375 }
00376
00377 u_int16_t header_len = ntohs(filehdr.header_length);
00378 size_t bundle_len = ntohl(filehdr.bundle_length);
00379
00380 log_debug("found bundle file %s: header_length %u bundle_length %zu",
00381 path.c_str(), header_len, bundle_len);
00382
00383
00384 buf = (u_char*)malloc(header_len);
00385 cc = oasys::IO::readall(fd, (char*)buf, header_len);
00386 if (cc != header_len) {
00387 log_err("error reading file %s header (read %d/%d): %s",
00388 path.c_str(), cc, header_len, strerror(errno));
00389 free(buf);
00390 continue;
00391 }
00392
00393 Bundle* bundle = new Bundle();
00394 if (! BundleProtocol::parse_header_blocks(bundle, buf, header_len)) {
00395 log_err("error parsing bundle headers in file %s", path.c_str());
00396 free(buf);
00397 delete bundle;
00398 continue;
00399 }
00400 free(buf);
00401
00402
00403 size_t payload_len = bundle->payload_.length();
00404 if (bundle_len != header_len + payload_len) {
00405 log_err("error in bundle lengths in file %s: "
00406 "bundle_length %zu, header_length %u, payload_length %zu",
00407 path.c_str(), bundle_len, header_len, payload_len);
00408 delete bundle;
00409 continue;
00410 }
00411
00412
00413 buf = (u_char*)malloc(payload_len);
00414 cc = oasys::IO::readall(fd, (char*)buf, payload_len);
00415 if (cc != (int)payload_len) {
00416 log_err("error reading file %s payload (read %d/%zu): %s",
00417 path.c_str(), cc, payload_len, strerror(errno));
00418 delete bundle;
00419 continue;
00420 }
00421 bundle->payload_.set_data(buf, payload_len);
00422 free(buf);
00423
00424
00425 if (close(fd) != 0) {
00426 log_err("error closing file %s: %s",
00427 path.c_str(), strerror(errno));
00428 }
00429
00430 if (unlink(path.c_str()) != 0) {
00431 log_err("error removing file %s: %s",
00432 path.c_str(), strerror(errno));
00433 }
00434
00435
00436 BundleDaemon::post(
00437 new BundleReceivedEvent(bundle, EVENTSRC_PEER));
00438 }
00439
00440 sleep(secs_per_scan_);
00441 }
00442 log_info("exiting");
00443 }
00444
00448 void FileConvergenceLayer::Scanner::stop() {
00449 run_ = false;
00450 }
00451
00452 }