00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <errno.h>
00019 #include <sys/types.h>
00020 #include <sys/stat.h>
00021 #include <oasys/debug/DebugUtils.h>
00022 #include <oasys/thread/SpinLock.h>
00023 #include <oasys/util/ScratchBuffer.h>
00024 #include <oasys/util/StringBuffer.h>
00025
00026 #include "BundlePayload.h"
00027 #include "storage/BundleStore.h"
00028
00029 namespace dtn {
00030
00031
00032 size_t BundlePayload::mem_threshold_ = 16384;
00033 bool BundlePayload::test_no_remove_ = false;
00034
00035
00036 BundlePayload::BundlePayload(oasys::SpinLock* lock)
00037 : Logger("BundlePayload", "/dtn/bundle/payload"),
00038 location_(DISK), length_(0), rcvd_length_(0),
00039 cur_offset_(0), base_offset_(0), lock_(lock)
00040 {
00041 }
00042
00043
00044 void
00045 BundlePayload::init(int bundleid, location_t location)
00046 {
00047 location_ = location;
00048
00049 logpathf("/dtn/bundle/payload/%d", bundleid);
00050
00051
00052
00053 if (location == NODATA) {
00054 return;
00055 }
00056 BundleStore* bs = BundleStore::instance();
00057 oasys::StringBuffer path("%s/bundle_%d.dat",
00058 bs->payload_dir().c_str(), bundleid);
00059
00060 file_.logpathf("%s/file", logpath_);
00061
00062 int open_errno = 0;
00063 int err = file_.open(path.c_str(), O_EXCL | O_CREAT | O_RDWR,
00064 S_IRUSR | S_IWUSR, &open_errno);
00065
00066 if (err < 0 && open_errno == EEXIST)
00067 {
00068 log_err("payload file %s already exists: overwriting and retrying",
00069 path.c_str());
00070
00071 err = file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
00072 }
00073
00074 if (err < 0)
00075 {
00076 log_crit("error opening payload file %s: %s",
00077 path.c_str(), strerror(errno));
00078 return;
00079 }
00080
00081 int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00082 if (fd != file_.fd()) {
00083 PANIC("duplicate entry in open fd cache");
00084 }
00085 unpin_file();
00086 }
00087
00088
00089 void
00090 BundlePayload::init_from_store(int bundleid)
00091 {
00092 location_ = DISK;
00093
00094 BundleStore* bs = BundleStore::instance();
00095 oasys::StringBuffer path("%s/bundle_%d.dat",
00096 bs->payload_dir().c_str(), bundleid);
00097
00098 file_.logpathf("%s/file", logpath_);
00099
00100 if (file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR) < 0)
00101 {
00102 log_crit("error opening payload file %s: %s",
00103 path.c_str(), strerror(errno));
00104 return;
00105 }
00106
00107 int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00108 if (fd != file_.fd()) {
00109 PANIC("duplicate entry in open fd cache");
00110 }
00111 unpin_file();
00112 }
00113
00114
00115 BundlePayload::~BundlePayload()
00116 {
00117 if (location_ != NODATA && file_.is_open()) {
00118 BundleStore::instance()->payload_fdcache()->close(file_.path());
00119 file_.set_fd(-1);
00120
00121 if (!test_no_remove_)
00122 file_.unlink();
00123 }
00124 }
00125
00126
00127 void
00128 BundlePayload::serialize(oasys::SerializeAction* a)
00129 {
00130 a->process("length", (u_int32_t*)&length_);
00131 a->process("rcvd_length", (u_int32_t*)&rcvd_length_);
00132 a->process("base_offset", (u_int32_t*)&base_offset_);
00133 }
00134
00135
00136 void
00137 BundlePayload::set_length(size_t length, location_t new_location)
00138 {
00139 oasys::ScopeLock l(lock_, "BundlePayload::set_length");
00140
00141 length_ = length;
00142
00143 if (location_ == UNDETERMINED) {
00144 if (new_location == UNDETERMINED) {
00145 location_ = (length_ < mem_threshold_) ? MEMORY : DISK;
00146 } else {
00147 location_ = new_location;
00148 }
00149 }
00150
00151 ASSERT(location_ != UNDETERMINED);
00152 }
00153
00154
00155
00156 void
00157 BundlePayload::pin_file()
00158 {
00159 BundleStore* bs = BundleStore::instance();
00160 int fd = bs->payload_fdcache()->get_and_pin(file_.path());
00161
00162 if (fd == -1) {
00163 if (file_.reopen(O_RDWR) < 0) {
00164 log_err("error reopening file %s: %s",
00165 file_.path(), strerror(errno));
00166 return;
00167 }
00168
00169 cur_offset_ = 0;
00170
00171 int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
00172 if (fd != file_.fd()) {
00173 PANIC("duplicate entry in open fd cache");
00174 }
00175
00176 } else {
00177 ASSERT(fd == file_.fd());
00178 }
00179 }
00180
00181
00182 void
00183 BundlePayload::unpin_file()
00184 {
00185 BundleStore::instance()->payload_fdcache()->unpin(file_.path());
00186 }
00187
00188
00189 void
00190 BundlePayload::truncate(size_t length)
00191 {
00192 oasys::ScopeLock l(lock_, "BundlePayload::truncate");
00193
00194 ASSERT(length <= length_);
00195 ASSERT(length <= rcvd_length_);
00196 length_ = length;
00197 rcvd_length_ = length;
00198 cur_offset_ = length;
00199
00200 if (location_ == MEMORY) {
00201 data_.resize(length);
00202 ASSERT(data_.length() == length);
00203 }
00204
00205 pin_file();
00206 file_.truncate(length);
00207 unpin_file();
00208 }
00209
00210
00211 void
00212 BundlePayload::copy_file(oasys::FileIOClient* dst)
00213 {
00214 pin_file();
00215 file_.lseek(0, SEEK_SET);
00216 file_.copy_contents(dst, length());
00217 unpin_file();
00218 }
00219
00220
00221 bool
00222 BundlePayload::replace_with_file(const char* path)
00223 {
00224 std::string payload_path = file_.path();
00225 file_.unlink();
00226 int err = ::link(path, payload_path.c_str());
00227 if (err == 0) {
00228 file_.set_path(payload_path);
00229 log_debug("replace_with_file: successfully created link to %s",
00230 path);
00231 return true;
00232 }
00233
00234 err = errno;
00235 if (err == EXDEV) {
00236
00237 log_debug("replace_with_file: link failed: %s", strerror(err));
00238
00239 oasys::FileIOClient src;
00240 int fd = src.open(path, O_RDONLY, &err);
00241 if (fd < 0) {
00242 log_err("error opening path '%s' for reading: %s",
00243 path, strerror(err));
00244 return false;
00245 }
00246
00247 file_.set_path(payload_path);
00248 pin_file();
00249 src.copy_contents(&file_);
00250 unpin_file();
00251 src.close();
00252 return true;
00253 }
00254
00255 log_err("error linking to path '%s': %s",
00256 path, strerror(err));
00257 return false;
00258 }
00259
00260
00261 void
00262 BundlePayload::internal_write(const u_char* bp, size_t offset, size_t len)
00263 {
00264
00265
00266 ASSERT(file_.is_open());
00267 ASSERT(lock_->is_locked_by_me());
00268 ASSERT(location_ != NODATA && location_ != UNDETERMINED);
00269
00270 if (location_ == MEMORY) {
00271
00272
00273 if (offset == data_.length()) {
00274 data_.append((const char*)bp, len);
00275 }
00276
00277
00278
00279 else if (offset > data_.length()) {
00280 data_.append(offset - data_.length(), '\0');
00281 data_.append((const char*)bp, len);
00282 }
00283
00284
00285 else if ((offset + len) <= data_.length()) {
00286 data_.replace(offset, len, (const char*)bp, len);
00287 }
00288
00289
00290 else {
00291 PANIC("unexpected case in internal_write: "
00292 "data.length=%zu offset=%zu len=%zu",
00293 data_.length(), offset, len);
00294 }
00295
00296
00297 ASSERTF(data_.length() >= offset + len,
00298 "length=%zu offset=%zu len=%zu",
00299 data_.length(), offset, len);
00300 }
00301
00302
00303 if (cur_offset_ != offset) {
00304 file_.lseek(offset, SEEK_SET);
00305 cur_offset_ = offset;
00306 }
00307
00308 file_.writeall((char*)bp, len);
00309
00310 cur_offset_ += len;
00311 rcvd_length_ = std::max(rcvd_length_, offset + len);
00312
00313 ASSERT(rcvd_length_ <= length_);
00314 }
00315
00316
00317 void
00318 BundlePayload::set_data(const u_char* bp, size_t len)
00319 {
00320 oasys::ScopeLock l(lock_, "BundlePayload::set_data");
00321
00322 ASSERT(rcvd_length_ == 0);
00323 set_length(len);
00324
00325 pin_file();
00326 internal_write(bp, base_offset_, len);
00327 unpin_file();
00328 }
00329
00330
00331 void
00332 BundlePayload::append_data(const u_char* bp, size_t len)
00333 {
00334 oasys::ScopeLock l(lock_, "BundlePayload::append_data");
00335
00336 ASSERT(length_ > 0);
00337 pin_file();
00338
00339
00340 if (cur_offset_ != rcvd_length_) {
00341 file_.lseek(rcvd_length_, SEEK_SET);
00342 cur_offset_ = rcvd_length_;
00343 }
00344
00345 internal_write(bp, base_offset_ + cur_offset_, len);
00346 unpin_file();
00347 }
00348
00349
00350 void
00351 BundlePayload::write_data(const u_char* bp, size_t offset, size_t len)
00352 {
00353 oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00354
00355 ASSERT(length_ >= (len + offset));
00356 pin_file();
00357 internal_write(bp, base_offset_ + offset, len);
00358 unpin_file();
00359 }
00360
00361
00362 void
00363 BundlePayload::write_data(BundlePayload* src, size_t src_offset,
00364 size_t len, size_t dst_offset)
00365 {
00366 oasys::ScopeLock l(lock_, "BundlePayload::write_data");
00367
00368 log_debug("write_data: file=%s length_=%zu src_offset=%zu dst_offset=%zu len %zu",
00369 file_.path(),
00370 length_, src_offset, dst_offset, len);
00371
00372 ASSERT(length_ >= dst_offset + len);
00373 ASSERT(src->length() >= src_offset + len);
00374
00375
00376
00377
00378
00379
00380
00381 oasys::ScratchBuffer<u_char*, 1024> buf(len);
00382 const u_char* bp = src->read_data(src_offset, len, buf.buf());
00383
00384 pin_file();
00385 internal_write(bp, dst_offset, len);
00386 unpin_file();
00387 }
00388
00389
00390 const u_char*
00391 BundlePayload::read_data(size_t offset, size_t len, u_char* buf, int flags)
00392 {
00393 oasys::ScopeLock l(lock_, "BundlePayload::read_data");
00394
00395 ASSERTF(length_ >= (offset + len),
00396 "length=%zu offset=%zu len=%zu",
00397 length_, offset, len);
00398
00399 ASSERTF(rcvd_length_ >= (offset + len),
00400 "rcvd_length=%zu offset=%zu len=%zu",
00401 rcvd_length_, offset, len);
00402
00403 if (location_ == MEMORY) {
00404 if (flags & FORCE_COPY) {
00405 memcpy(buf, data_.data() + offset, len);
00406 return buf;
00407 } else {
00408 return (u_char*)data_.data() + offset;
00409 }
00410
00411 } else {
00412 ASSERT(buf);
00413
00414 pin_file();
00415
00416
00417 if (offset != cur_offset_) {
00418 file_.lseek(offset, SEEK_SET);
00419 }
00420
00421 file_.readall((char*)buf, len);
00422 cur_offset_ = offset + len;
00423
00424 unpin_file();
00425
00426 return buf;
00427 }
00428 }
00429
00430
00431 }