00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <errno.h>
00040 #include <sys/types.h>
00041 #include <sys/stat.h>
00042 #include <oasys/debug/DebugUtils.h>
00043 #include <oasys/thread/SpinLock.h>
00044 #include <oasys/util/StringBuffer.h>
00045
00046 #include "BundlePayload.h"
00047
00048 namespace dtn {
00049
00050
00051
00052
00053 std::string BundlePayload::payloaddir_;
00054 size_t BundlePayload::mem_threshold_;
00055 bool BundlePayload::test_no_remove_;
00056
00060 BundlePayload::BundlePayload(oasys::SpinLock* lock)
00061 : Logger("BundlePayload", "/dtn/bundle/payload"),
00062 location_(DISK), length_(0), rcvd_length_(0), file_(NULL),
00063 cur_offset_(0), base_offset_(0), lock_(lock)
00064 {
00065 }
00066
00070 void
00071 BundlePayload::init(int bundleid, location_t location)
00072 {
00073 location_ = location;
00074
00075 logpathf("/dtn/bundle/payload/%d", bundleid);
00076
00077
00078
00079 if (location != NODATA) {
00080 oasys::StringBuffer path("%s/bundle_%d.dat",
00081 BundlePayload::payloaddir_.c_str(), bundleid);
00082 file_ = new oasys::FileIOClient();
00083 file_->logpathf("%s/file", logpath_);
00084 int open_errno = 0;
00085 int err = file_->open(path.c_str(), O_EXCL | O_CREAT | O_RDWR,
00086 S_IRUSR | S_IWUSR, &open_errno);
00087
00088 if (err < 0 && open_errno == EEXIST)
00089 {
00090 log_err("payload file %s already exists: overwriting and retrying",
00091 path.c_str());
00092
00093 err = file_->open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
00094 }
00095
00096 if (err < 0)
00097 {
00098 log_crit("error opening payload file %s: %s",
00099 path.c_str(), strerror(errno));
00100 return;
00101 }
00102 }
00103 }
00104
00108 void
00109 BundlePayload::init_from_store(int bundleid)
00110 {
00111 location_ = DISK;
00112
00113 oasys::StringBuffer path("%s/bundle_%d.dat",
00114 BundlePayload::payloaddir_.c_str(), bundleid);
00115 file_ = new oasys::FileIOClient();
00116 file_->logpathf("/bundle/payload/%d", bundleid);
00117 if (file_->open(path.c_str(),
00118 O_RDWR, S_IRUSR | S_IWUSR) < 0)
00119 {
00120 log_crit("error opening payload file %s: %s",
00121 path.c_str(), strerror(errno));
00122 return;
00123 }
00124 file_->close();
00125 }
00126
00130 BundlePayload::~BundlePayload()
00131 {
00132 if (file_) {
00133 if (!test_no_remove_)
00134 file_->unlink();
00135 delete file_;
00136 file_ = NULL;
00137 }
00138 }
00139
00143 void
00144 BundlePayload::serialize(oasys::SerializeAction* a)
00145 {
00146 a->process("filename", &fname_);
00147 a->process("length", (u_int32_t*)&length_);
00148 a->process("rcvd_length", (u_int32_t*)&rcvd_length_);
00149 a->process("base_offset", (u_int32_t*)&base_offset_);
00150 }
00151
00157 void
00158 BundlePayload::set_length(size_t length, location_t new_location)
00159 {
00160 oasys::ScopeLock l(lock_, "BundlePayload::set_length");
00161
00162 length_ = length;
00163
00164 if (location_ == UNDETERMINED) {
00165 if (new_location == UNDETERMINED) {
00166 location_ = (length_ < mem_threshold_) ? MEMORY : DISK;
00167 } else {
00168 location_ = new_location;
00169 }
00170 }
00171
00172 ASSERT(location_ != UNDETERMINED);
00173 }
00174
00178 void
00179 BundlePayload::truncate(size_t length)
00180 {
00181 oasys::ScopeLock l(lock_, "BundlePayload::truncate");
00182
00183 ASSERT(length <= length_);
00184 ASSERT(length <= rcvd_length_);
00185 length_ = length;
00186 rcvd_length_ = length;
00187 cur_offset_ = length;
00188
00189 if (location_ == MEMORY) {
00190 data_.resize(length);
00191 ASSERT(data_.length() == length);
00192 }
00193
00194 reopen_file();
00195 file_->truncate(length);
00196 close_file();
00197 }
00198
00202 void
00203 BundlePayload::reopen_file()
00204 {
00205 if (!file_->is_open()) {
00206 if (file_->reopen(O_RDWR) < 0) {
00207 log_err("error reopening file %s: %s",
00208 file_->path(), strerror(errno));
00209 return;
00210 }
00211
00212 cur_offset_ = 0;
00213 }
00214 }
00215
00219 void
00220 BundlePayload::close_file()
00221 {
00222 if (file_->is_open()) {
00223 file_->close();
00224 }
00225 }
00226
00230 void
00231 BundlePayload::copy_file(oasys::FileIOClient* dst)
00232 {
00233 if (! is_file_open()) {
00234 reopen_file();
00235 } else {
00236 file_->lseek(0, SEEK_SET);
00237 }
00238
00239 file_->copy_contents(length(), dst);
00240
00241 close_file();
00242 }
00243
00247 void
00248 BundlePayload::internal_write(const u_char* bp, size_t offset, size_t len)
00249 {
00250 ASSERT(lock_->is_locked_by_me());
00251 ASSERT(file_->is_open());
00252 ASSERT(location_ != NODATA && location_ != UNDETERMINED);
00253
00254 if (location_ == MEMORY) {
00255
00256
00257 if (offset == data_.length()) {
00258 data_.append((const char*)bp, len);
00259 }
00260
00261
00262
00263 else if (offset > data_.length()) {
00264 data_.append(offset - data_.length(), '\0');
00265 data_.append((const char*)bp, len);
00266 }
00267
00268
00269 else if ((offset + len) <= data_.length()) {
00270 data_.replace(offset, len, (const char*)bp, len);
00271 }
00272
00273
00274 else {
00275 PANIC("unexpected case in internal_write: "
00276 "data.length=%zu offset=%zu len=%zu",
00277 data_.length(), offset, len);
00278 }
00279
00280
00281 ASSERTF(data_.length() >= offset + len,
00282 "length=%zu offset=%zu len=%zu",
00283 data_.length(), offset, len);
00284 }
00285
00286
00287 if (cur_offset_ != offset) {
00288 file_->lseek(offset, SEEK_SET);
00289 cur_offset_ = offset;
00290 }
00291
00292 file_->writeall((char*)bp, len);
00293
00294 cur_offset_ += len;
00295 rcvd_length_ = std::max(rcvd_length_, offset + len);
00296
00297 ASSERT(rcvd_length_ <= length_);
00298 }
00299
00304 void
00305 BundlePayload::set_data(const u_char* bp, size_t len)
00306 {
00307 oasys::ScopeLock l(lock_, "BundlePayload::set_data");
00308
00309 ASSERT(rcvd_length_ == 0);
00310 set_length(len);
00311
00312 reopen_file();
00313 internal_write(bp, base_offset_, len);
00314 close_file();
00315 }
00316
00321 void
00322 BundlePayload::append_data(const u_char* bp, size_t len)
00323 {
00324 oasys::ScopeLock l(lock_, "BundlePayload::append_data");
00325
00326 ASSERT(length_ > 0);
00327 ASSERT(file_->is_open());
00328
00329
00330 if (cur_offset_ != rcvd_length_) {
00331 file_->lseek(rcvd_length_, SEEK_SET);
00332 cur_offset_ = rcvd_length_;
00333 }
00334
00335 internal_write(bp, base_offset_ + cur_offset_, len);
00336 }
00337
00342 void
00343 BundlePayload::write_data(const u_char* bp, size_t offset, size_t len)
00344 {
00345 oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00346
00347 ASSERT(length_ >= (len + offset));
00348 ASSERT(file_->is_open());
00349
00350 internal_write(bp, base_offset_ + offset, len);
00351 }
00352
00358 void
00359 BundlePayload::write_data(BundlePayload* src, size_t src_offset,
00360 size_t len, size_t dst_offset)
00361 {
00362 oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00363
00364 log_debug("write_data: file=%s length_=%zu src_offset=%zu dst_offset=%zu len %zu",
00365 file_->path(),
00366 length_, src_offset, dst_offset, len);
00367
00368 ASSERT(length_ >= dst_offset + len);
00369 ASSERT(src->length() >= src_offset + len);
00370 ASSERT(file_->is_open());
00371
00372
00373
00374
00375 u_char buf[len];
00376 const u_char* bp = src->read_data(src_offset, len, buf, KEEP_FILE_OPEN);
00377 internal_write(bp, dst_offset, len);
00378 }
00379
00388 const u_char*
00389 BundlePayload::read_data(size_t offset, size_t len, u_char* buf, int flags)
00390 {
00391 oasys::ScopeLock l(lock_, "BundlePayload::read_data");
00392
00393 ASSERTF(length_ >= (offset + len),
00394 "length=%zu offset=%zu len=%zu",
00395 length_, offset, len);
00396
00397 ASSERTF(rcvd_length_ >= (offset + len),
00398 "rcvd_length=%zu offset=%zu len=%zu",
00399 rcvd_length_, offset, len);
00400
00401 if (location_ == MEMORY) {
00402 if (flags & FORCE_COPY) {
00403 memcpy(buf, data_.data() + offset, len);
00404 return buf;
00405 } else {
00406 return (u_char*)data_.data() + offset;
00407 }
00408
00409 } else {
00410 ASSERT(buf);
00411
00412 reopen_file();
00413
00414
00415 if (offset != cur_offset_) {
00416 file_->lseek(offset, SEEK_SET);
00417 }
00418
00419 file_->readall((char*)buf, len);
00420 cur_offset_ = offset + len;
00421
00422 if (! (flags & KEEP_FILE_OPEN))
00423 close_file();
00424
00425 return buf;
00426 }
00427 }
00428
00429
00430 }