00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <algorithm>
00040 #include <stdlib.h>
00041 #include <oasys/thread/SpinLock.h>
00042
00043 #include "Bundle.h"
00044 #include "BundleList.h"
00045 #include "BundleTimestamp.h"
00046
00047 namespace dtn {
00048
00049
00050
00051
00052
00053
00054
00055 BundleList::BundleList(const std::string& name)
00056 : Logger("BundleList", "/dtn/bundle/list/%s", name.c_str()),
00057 name_(name), lock_(new oasys::SpinLock()), notifier_(NULL)
00058 {
00059 }
00060
00061 BundleList::~BundleList()
00062 {
00063 clear();
00064 delete lock_;
00065 }
00066
00070 BundleRef
00071 BundleList::front()
00072 {
00073 oasys::ScopeLock l(lock_, "BundleList::front");
00074
00075 BundleRef ret("BundleList::front() temporary");
00076 if (list_.empty())
00077 return ret;
00078
00079 ret = list_.front();
00080 return ret;
00081 }
00082
00086 BundleRef
00087 BundleList::back()
00088 {
00089 oasys::ScopeLock l(lock_, "BundleList::back");
00090
00091 BundleRef ret("BundleList::back() temporary");
00092 if (list_.empty())
00093 return ret;
00094
00095 ret = list_.back();
00096 return ret;
00097 }
00098
00103 void
00104 BundleList::add_bundle(Bundle* b, const iterator& pos)
00105 {
00106 ASSERT(lock_->is_locked_by_me());
00107 ASSERT(b->lock_.is_locked_by_me());
00108
00109 if (b->mappings_.insert(this).second == false) {
00110 log_err("ERROR in add bundle: "
00111 "bundle id %d already on list [%s]",
00112 b->bundleid_, name_.c_str());
00113
00114 return;
00115 }
00116
00117 list_.insert(pos, b);
00118 b->add_ref("bundle_list", name_.c_str());
00119 if (notifier_ != 0) {
00120 notifier_->notify();
00121 }
00122
00123 log_debug("bundle id %d add mapping [%s] to list %p",
00124 b->bundleid_, name_.c_str(), this);
00125 }
00126
00130 void
00131 BundleList::push_front(Bundle* b)
00132 {
00133 oasys::ScopeLock l(lock_, "BundleList::push_front");
00134 oasys::ScopeLock bl(&b->lock_, "BundleList::push_front");
00135 add_bundle(b, list_.begin());
00136 }
00137
00141 void
00142 BundleList::push_back(Bundle* b)
00143 {
00144 oasys::ScopeLock l(lock_, "BundleList::push_back");
00145 oasys::ScopeLock bl(&b->lock_, "BundleList::push_back");
00146 add_bundle(b, list_.end());
00147 }
00148
00152 void
00153 BundleList::insert_sorted(Bundle* b, sort_order_t sort_order)
00154 {
00155 iterator iter;
00156 oasys::ScopeLock l(lock_, "BundleList::insert_sorted");
00157 oasys::ScopeLock bl(&b->lock_, "BundleList::insert_sorted");
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168 for (iter = list_.begin(); iter != list_.end(); ++iter)
00169 {
00170 if (sort_order == SORT_FRAG_OFFSET) {
00171 if ((*iter)->frag_offset_ > b->frag_offset_) {
00172 break;
00173 }
00174
00175 } else if (sort_order == SORT_PRIORITY) {
00176 NOTIMPLEMENTED;
00177
00178 } else {
00179 PANIC("invalid value for sort order %d", sort_order);
00180 }
00181 }
00182
00183 add_bundle(b, iter);
00184 }
00185
00190 void
00191 BundleList::insert_random(Bundle* b)
00192 {
00193 iterator iter;
00194 oasys::ScopeLock l(lock_, "BundleList::insert_random");
00195 oasys::ScopeLock bl(&b->lock_, "BundleList::insert_random");
00196
00197 iter = begin();
00198 int location = 0;
00199 if (size() != 0) {
00200 location = random() % size();
00201 }
00202
00203 log_info("insert_random at %d/%zu", location, size());
00204
00205 for (int i = 0; i < location; ++i) {
00206 ++iter;
00207 }
00208
00209 add_bundle(b, iter);
00210 }
00211
00215 Bundle*
00216 BundleList::del_bundle(const iterator& pos, bool used_notifier)
00217 {
00218 Bundle* b = *pos;
00219 ASSERT(lock_->is_locked_by_me());
00220
00221
00222 oasys::ScopeLock l(& b->lock_, "BundleList::del_bundle");
00223
00224
00225 log_debug("bundle id %d del_bundle: deleting mapping [%s]",
00226 b->bundleid_, name_.c_str());
00227
00228 Bundle::BundleMappings::iterator mapping_pos = b->mappings_.find(this);
00229 if (mapping_pos == b->mappings_.end()) {
00230 log_err("ERROR in del bundle: "
00231 "bundle id %d not on list [%s]",
00232 b->bundleid_, name_.c_str());
00233 } else {
00234 b->mappings_.erase(mapping_pos);
00235 }
00236
00237
00238 list_.erase(pos);
00239
00240
00241 if (notifier_ && !used_notifier) {
00242 notifier_->drain_pipe(1);
00243 }
00244
00245
00246
00247
00248 return b;
00249 }
00250
00254 BundleRef
00255 BundleList::pop_front(bool used_notifier)
00256 {
00257 oasys::ScopeLock l(lock_, "pop_front");
00258
00259 BundleRef ret("BundleList::pop_front() temporary");
00260
00261 if (list_.empty()) {
00262 return ret;
00263 }
00264
00265 ASSERT(list_.size() != 0);
00266
00267
00268
00269 ret = del_bundle(list_.begin(), used_notifier);
00270 ret.object()->del_ref("bundle_list", name_.c_str());
00271 return ret;
00272 }
00273
00277 BundleRef
00278 BundleList::pop_back(bool used_notifier)
00279 {
00280 oasys::ScopeLock l(lock_, "BundleList::pop_back");
00281
00282 BundleRef ret("BundleList::pop_back() temporary");
00283
00284 if (list_.empty()) {
00285 return ret;
00286 }
00287
00288
00289
00290 ret = del_bundle(--list_.end(), used_notifier);
00291 ret->del_ref("bundle_list", name_.c_str());
00292 return ret;
00293 }
00294
00302 bool
00303 BundleList::erase(Bundle* bundle, bool used_notifier)
00304 {
00305 if (bundle == NULL) {
00306 return false;
00307 }
00308
00309 ASSERTF(!bundle->lock_.is_locked_by_me(),
00310 "bundle cannot be locked in erase due to potential deadlock");
00311
00312 oasys::ScopeLock l(lock_, "BundleList::erase");
00313
00314 iterator pos = std::find(begin(), end(), bundle);
00315 if (pos == end()) {
00316 return false;
00317 }
00318
00319 del_bundle(pos, used_notifier);
00320
00321 bundle->del_ref("bundle_list", name_.c_str());
00322 return true;
00323 }
00324
00330 bool
00331 BundleList::contains(Bundle* bundle)
00332 {
00333 if (bundle == NULL) {
00334 return false;
00335 }
00336
00337 bool ret = bundle->is_queued_on(this);
00338
00339 #define DEBUG_MAPPINGS
00340 #ifdef DEBUG_MAPPINGS
00341 oasys::ScopeLock l(lock_, "BundleList::contains");
00342 bool ret2 = (std::find(begin(), end(), bundle) != end());
00343 ASSERT(ret == ret2);
00344 #endif
00345
00346 return ret;
00347 }
00348
00354 BundleRef
00355 BundleList::find(u_int32_t bundle_id)
00356 {
00357 oasys::ScopeLock l(lock_, "BundleList::find");
00358 BundleRef ret("BundleList::find() temporary");
00359 for (iterator iter = begin(); iter != end(); ++iter) {
00360 if ((*iter)->bundleid_ == bundle_id) {
00361 ret = *iter;
00362 return ret;
00363 }
00364 }
00365
00366 return ret;
00367 }
00368
00375 BundleRef
00376 BundleList::find(const EndpointID& source_eid,
00377 const BundleTimestamp& creation_ts)
00378 {
00379 oasys::ScopeLock l(lock_, "BundleList::find");
00380 BundleRef ret("BundleList::find() temporary");
00381
00382 for (iterator iter = begin(); iter != end(); ++iter) {
00383 if ((*iter)->source_.equals(source_eid) &&
00384 (*iter)->creation_ts_.seconds_ == creation_ts.seconds_ &&
00385 (*iter)->creation_ts_.seqno_ == creation_ts.seqno_)
00386 {
00387 ret = *iter;
00388 return ret;
00389 }
00390 }
00391
00392 return ret;
00393 }
00394
00398 void
00399 BundleList::move_contents(BundleList* other)
00400 {
00401 oasys::ScopeLock l1(lock_, "BundleList::move_contents");
00402 oasys::ScopeLock l2(other->lock_, "BundleList::move_contents");
00403
00404 BundleRef b("BundleList::move_contents temporary");
00405 while (!list_.empty()) {
00406 b = pop_front();
00407 other->push_back(b.object());
00408 }
00409 }
00410
00414 void
00415 BundleList::clear()
00416 {
00417 oasys::ScopeLock l(lock_, "BundleList::clear");
00418
00419 while (!list_.empty()) {
00420 BundleRef b("BundleList::clear temporary");
00421 b = pop_front();
00422 }
00423 }
00424
00425
00429 size_t
00430 BundleList::size() const
00431 {
00432 oasys::ScopeLock l(lock_, "BundleList::size");
00433 return list_.size();
00434 }
00435
00441 BundleList::iterator
00442 BundleList::begin()
00443 {
00444 if (!lock_->is_locked_by_me())
00445 PANIC("Must lock BundleList before using iterator");
00446
00447 return list_.begin();
00448 }
00449
00455 BundleList::iterator
00456 BundleList::end()
00457 {
00458 if (!lock_->is_locked_by_me())
00459 PANIC("Must lock BundleList before using iterator");
00460
00461 return list_.end();
00462 }
00463
00469 BundleList::const_iterator
00470 BundleList::begin() const
00471 {
00472 if (!lock_->is_locked_by_me())
00473 PANIC("Must lock BundleList before using iterator");
00474
00475 return list_.begin();
00476 }
00477
00478
00484 BundleList::const_iterator
00485 BundleList::end() const
00486 {
00487 if (!lock_->is_locked_by_me())
00488 PANIC("Must lock BundleList before using iterator");
00489
00490 return list_.end();
00491 }
00492
00493 BlockingBundleList::BlockingBundleList(const std::string& name)
00494 : BundleList(name)
00495 {
00496 notifier_ = new oasys::Notifier(logpath_);
00497 }
00498
00499 BlockingBundleList::~BlockingBundleList()
00500 {
00501 delete notifier_;
00502 notifier_ = NULL;
00503 }
00504
00512 BundleRef
00513 BlockingBundleList::pop_blocking(int timeout)
00514 {
00515 ASSERT(notifier_);
00516
00517 log_debug("pop_blocking on list %p [%s]",
00518 this, name_.c_str());
00519
00520 lock_->lock("BlockingBundleList::pop_blocking");
00521
00522 bool used_wait;
00523 if (list_.empty()) {
00524 used_wait = true;
00525 bool notified = notifier_->wait(lock_, timeout);
00526 ASSERT(lock_->is_locked_by_me());
00527
00528
00529
00530 if (!notified) {
00531 lock_->unlock();
00532 log_debug("pop_blocking timeout on list %p", this);
00533
00534 return BundleRef("BlockingBundleList::pop_blocking temporary");
00535 }
00536 } else {
00537 used_wait = false;
00538 }
00539
00540
00541
00542 ASSERT(!list_.empty());
00543
00544 BundleRef ret("BlockingBundleList::pop_blocking temporary");
00545 ret = pop_front(used_wait);
00546
00547 lock_->unlock();
00548
00549 log_debug("pop_blocking got bundle %p from list %p",
00550 ret.object(), this);
00551
00552 return ret;
00553 }
00554
00555
00556 }