BundlePayload.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <errno.h>
00019 #include <sys/types.h>
00020 #include <sys/stat.h>
00021 #include <oasys/debug/DebugUtils.h>
00022 #include <oasys/thread/SpinLock.h>
00023 #include <oasys/util/ScratchBuffer.h>
00024 #include <oasys/util/StringBuffer.h>
00025 
00026 #include "BundlePayload.h"
00027 #include "storage/BundleStore.h"
00028 
00029 namespace dtn {
00030 
00031 //----------------------------------------------------------------------
00032 size_t BundlePayload::mem_threshold_ = 16384;
00033 bool BundlePayload::test_no_remove_ = false;
00034 
00035 //----------------------------------------------------------------------
00036 BundlePayload::BundlePayload(oasys::SpinLock* lock)
00037     : Logger("BundlePayload", "/dtn/bundle/payload"),
00038       location_(DISK), length_(0), rcvd_length_(0), 
00039       cur_offset_(0), base_offset_(0), lock_(lock)
00040 {
00041 }
00042 
00043 //----------------------------------------------------------------------
00044 void
00045 BundlePayload::init(int bundleid, location_t location)
00046 {
00047     location_ = location;
00048     
00049     logpathf("/dtn/bundle/payload/%d", bundleid);
00050 
00051     // initialize the file handle for the backing store, but
00052     // immediately close it
00053     if (location == NODATA) {
00054         return;
00055     }
00056     BundleStore* bs = BundleStore::instance();
00057     oasys::StringBuffer path("%s/bundle_%d.dat",
00058                              bs->payload_dir().c_str(), bundleid);
00059     
00060     file_.logpathf("%s/file", logpath_);
00061     
00062     int open_errno = 0;
00063     int err = file_.open(path.c_str(), O_EXCL | O_CREAT | O_RDWR,
00064                          S_IRUSR | S_IWUSR, &open_errno);
00065     
00066     if (err < 0 && open_errno == EEXIST)
00067     {
00068         log_err("payload file %s already exists: overwriting and retrying",
00069                 path.c_str());
00070 
00071         err = file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
00072     }
00073         
00074     if (err < 0)
00075     {
00076         log_crit("error opening payload file %s: %s",
00077                  path.c_str(), strerror(errno));
00078         return;
00079     }
00080 
00081     int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00082     if (fd != file_.fd()) {
00083         PANIC("duplicate entry in open fd cache");
00084     }
00085     unpin_file();
00086 }
00087 
00088 //----------------------------------------------------------------------
00089 void
00090 BundlePayload::init_from_store(int bundleid)
00091 {
00092     location_ = DISK;
00093 
00094     BundleStore* bs = BundleStore::instance();
00095     oasys::StringBuffer path("%s/bundle_%d.dat",
00096                              bs->payload_dir().c_str(), bundleid);
00097 
00098     file_.logpathf("%s/file", logpath_);
00099     
00100     if (file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR) < 0)
00101     {
00102         log_crit("error opening payload file %s: %s",
00103                  path.c_str(), strerror(errno));
00104         return;
00105     }
00106 
00107     int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00108     if (fd != file_.fd()) {
00109         PANIC("duplicate entry in open fd cache");
00110     }
00111     unpin_file();
00112 }
00113 
00114 //----------------------------------------------------------------------
00115 BundlePayload::~BundlePayload()
00116 {
00117     if (location_ != NODATA && file_.is_open()) {
00118         BundleStore::instance()->payload_fdcache()->close(file_.path());
00119         file_.set_fd(-1); // avoid duplicate close
00120         
00121         if (!test_no_remove_)
00122             file_.unlink();
00123     }
00124 }
00125 
00126 //----------------------------------------------------------------------
00127 void
00128 BundlePayload::serialize(oasys::SerializeAction* a)
00129 {
00130     a->process("length",      (u_int32_t*)&length_);
00131     a->process("rcvd_length", (u_int32_t*)&rcvd_length_);
00132     a->process("base_offset", (u_int32_t*)&base_offset_);
00133 }
00134 
00135 //----------------------------------------------------------------------
00136 void
00137 BundlePayload::set_length(size_t length, location_t new_location)
00138 {
00139     oasys::ScopeLock l(lock_, "BundlePayload::set_length");
00140 
00141     length_ = length;
00142 
00143     if (location_ == UNDETERMINED) {
00144         if (new_location == UNDETERMINED) {
00145             location_ = (length_ < mem_threshold_) ? MEMORY : DISK;
00146         } else {
00147             location_ = new_location;
00148         }
00149     }
00150 
00151     ASSERT(location_ != UNDETERMINED);
00152 }
00153 
00154 
00155 //----------------------------------------------------------------------
00156 void
00157 BundlePayload::pin_file()
00158 {
00159     BundleStore* bs = BundleStore::instance();
00160     int fd = bs->payload_fdcache()->get_and_pin(file_.path());
00161     
00162     if (fd == -1) {
00163         if (file_.reopen(O_RDWR) < 0) {
00164             log_err("error reopening file %s: %s",
00165                     file_.path(), strerror(errno));
00166             return;
00167         }
00168         
00169         cur_offset_ = 0;
00170 
00171         int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00172         if (fd != file_.fd()) {
00173             PANIC("duplicate entry in open fd cache");
00174         }
00175         
00176     } else {
00177         ASSERT(fd == file_.fd());
00178     }
00179 }
00180 
00181 //----------------------------------------------------------------------
00182 void
00183 BundlePayload::unpin_file()
00184 {
00185     BundleStore::instance()->payload_fdcache()->unpin(file_.path());
00186 }
00187 
00188 //----------------------------------------------------------------------
00189 void
00190 BundlePayload::truncate(size_t length)
00191 {
00192     oasys::ScopeLock l(lock_, "BundlePayload::truncate");
00193     
00194     ASSERT(length <= length_);
00195     ASSERT(length <= rcvd_length_);
00196     length_ = length;
00197     rcvd_length_ = length;
00198     cur_offset_ = length;
00199 
00200     if (location_ == MEMORY) {
00201         data_.resize(length);
00202         ASSERT(data_.length() == length);
00203     }
00204 
00205     pin_file();
00206     file_.truncate(length);
00207     unpin_file();
00208 }
00209 
00210 //----------------------------------------------------------------------
00211 void
00212 BundlePayload::copy_file(oasys::FileIOClient* dst)
00213 {
00214     pin_file();
00215     file_.lseek(0, SEEK_SET);
00216     file_.copy_contents(dst, length());
00217     unpin_file();
00218 }
00219 
00220 //----------------------------------------------------------------------
00221 bool
00222 BundlePayload::replace_with_file(const char* path)
00223 {
00224     std::string payload_path = file_.path();
00225     file_.unlink();
00226     int err = ::link(path, payload_path.c_str());
00227     if (err == 0) {
00228         file_.set_path(payload_path);
00229         log_debug("replace_with_file: successfully created link to %s",
00230                   path);
00231         return true;
00232     }
00233 
00234     err = errno;
00235     if (err == EXDEV) {
00236         // copy the contents if they're on different filesystems
00237         log_debug("replace_with_file: link failed: %s", strerror(err));
00238         
00239         oasys::FileIOClient src;
00240         int fd = src.open(path, O_RDONLY, &err);
00241         if (fd < 0) {
00242             log_err("error opening path '%s' for reading: %s",
00243                     path, strerror(err));
00244             return false;
00245         }
00246         
00247         file_.set_path(payload_path);
00248         pin_file();
00249         src.copy_contents(&file_);
00250         unpin_file();
00251         src.close();
00252         return true;
00253     }
00254 
00255     log_err("error linking to path '%s': %s",
00256             path, strerror(err));
00257     return false;
00258 }
00259     
00260 //----------------------------------------------------------------------
00261 void
00262 BundlePayload::internal_write(const u_char* bp, size_t offset, size_t len)
00263 {
00264     // the caller should have pinned the fd
00265     
00266     ASSERT(file_.is_open());
00267     ASSERT(lock_->is_locked_by_me());
00268     ASSERT(location_ != NODATA && location_ != UNDETERMINED);
00269 
00270     if (location_ == MEMORY) {
00271 
00272         // case 1: appending new data
00273         if (offset == data_.length()) {
00274             data_.append((const char*)bp, len);
00275         }
00276         
00277         // case 2: adding data after an empty space, so need some
00278         // intermediate padding
00279         else if (offset > data_.length()) {
00280             data_.append(offset - data_.length(), '\0');
00281             data_.append((const char*)bp, len);
00282         }
00283 
00284         // case 3: fully overwriting data in the buffer
00285         else if ((offset + len) <= data_.length()) {
00286             data_.replace(offset, len, (const char*)bp, len);
00287         }
00288 
00289         // that should cover it all
00290         else {
00291             PANIC("unexpected case in internal_write: "
00292                   "data.length=%zu offset=%zu len=%zu",
00293                   data_.length(), offset, len);
00294         }
00295 
00296         // sanity check
00297         ASSERTF(data_.length() >= offset + len,
00298                 "length=%zu offset=%zu len=%zu",
00299                 data_.length(), offset, len);
00300     }
00301     
00302     // check if we need to seek
00303     if (cur_offset_ != offset) {
00304         file_.lseek(offset, SEEK_SET);
00305         cur_offset_ = offset;
00306     }
00307 
00308     file_.writeall((char*)bp, len);
00309 
00310     cur_offset_  += len;
00311     rcvd_length_ = std::max(rcvd_length_, offset + len);
00312 
00313     ASSERT(rcvd_length_ <= length_);
00314 }
00315 
00316 //----------------------------------------------------------------------
00317 void
00318 BundlePayload::set_data(const u_char* bp, size_t len)
00319 {
00320     oasys::ScopeLock l(lock_, "BundlePayload::set_data");
00321     
00322     ASSERT(rcvd_length_ == 0);
00323     set_length(len);
00324     
00325     pin_file();
00326     internal_write(bp, base_offset_, len);
00327     unpin_file();
00328 }
00329 
00330 //----------------------------------------------------------------------
00331 void
00332 BundlePayload::append_data(const u_char* bp, size_t len)
00333 {
00334     oasys::ScopeLock l(lock_, "BundlePayload::append_data");
00335     
00336     ASSERT(length_ > 0);
00337     pin_file();
00338     
00339     // check if we need to seek
00340     if (cur_offset_ != rcvd_length_) {
00341         file_.lseek(rcvd_length_, SEEK_SET);
00342         cur_offset_ = rcvd_length_;
00343     }
00344     
00345     internal_write(bp, base_offset_ + cur_offset_, len);
00346     unpin_file();
00347 }
00348 
00349 //----------------------------------------------------------------------
00350 void
00351 BundlePayload::write_data(const u_char* bp, size_t offset, size_t len)
00352 {
00353     oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00354     
00355     ASSERT(length_ >= (len + offset));
00356     pin_file();
00357     internal_write(bp, base_offset_ + offset, len);
00358     unpin_file();
00359 }
00360 
00361 //----------------------------------------------------------------------
00362 void
00363 BundlePayload::write_data(BundlePayload* src, size_t src_offset,
00364                           size_t len, size_t dst_offset)
00365 {
00366     oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00367 
00368     log_debug("write_data: file=%s length_=%zu src_offset=%zu dst_offset=%zu len %zu",
00369               file_.path(),
00370               length_, src_offset, dst_offset, len);
00371 
00372     ASSERT(length_       >= dst_offset + len);
00373     ASSERT(src->length() >= src_offset + len);
00374 
00375     // XXX/mho: todo - for cases where we're creating a fragment from
00376     // an existing bundle, make a hard link for the new fragment and
00377     // store the offset in base_offset_
00378 
00379     // XXX/demmer todo -- we should copy the payload in max-length chunks
00380     
00381     oasys::ScratchBuffer<u_char*, 1024> buf(len);
00382     const u_char* bp = src->read_data(src_offset, len, buf.buf());
00383 
00384     pin_file();
00385     internal_write(bp, dst_offset, len);
00386     unpin_file();
00387 }
00388 
00389 //----------------------------------------------------------------------
00390 const u_char*
00391 BundlePayload::read_data(size_t offset, size_t len, u_char* buf, int flags)
00392 {
00393     oasys::ScopeLock l(lock_, "BundlePayload::read_data");
00394     
00395     ASSERTF(length_ >= (offset + len),
00396             "length=%zu offset=%zu len=%zu",
00397             length_, offset, len);
00398 
00399     ASSERTF(rcvd_length_ >= (offset + len),
00400             "rcvd_length=%zu offset=%zu len=%zu",
00401             rcvd_length_, offset, len);
00402     
00403     if (location_ == MEMORY) {
00404         if (flags & FORCE_COPY) {
00405             memcpy(buf, data_.data() + offset, len);
00406             return buf;
00407         } else {
00408             return (u_char*)data_.data() + offset;
00409         }
00410         
00411     } else {
00412         ASSERT(buf);
00413 
00414         pin_file();
00415         
00416         // check if we need to seek first
00417         if (offset != cur_offset_) {
00418             file_.lseek(offset, SEEK_SET);
00419         }
00420         
00421         file_.readall((char*)buf, len);
00422         cur_offset_ = offset + len;
00423 
00424         unpin_file();
00425         
00426         return buf;
00427     }
00428 }
00429 
00430 
00431 } // namespace dtn

Generated on Thu Jun 7 16:56:48 2007 for DTN Reference Implementation by  doxygen 1.5.1