00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <algorithm>
00019 #include <stdlib.h>
00020 #include <oasys/thread/SpinLock.h>
00021
00022 #include "Bundle.h"
00023 #include "BundleList.h"
00024 #include "BundleTimestamp.h"
00025
00026 namespace dtn {
00027
00028
00029
00030
00031 BundleList::BundleList(const std::string& name)
00032 : Logger("BundleList", "/dtn/bundle/list/%s", name.c_str()),
00033 name_(name), lock_(new oasys::SpinLock()), notifier_(NULL)
00034 {
00035 }
00036
00037 void
00038 BundleList::set_name(const std::string& name)
00039 {
00040 name_ = name;
00041 logpathf("/dtn/bundle/list/%s", name.c_str());
00042 }
00043
00044 BundleList::~BundleList()
00045 {
00046 clear();
00047 delete lock_;
00048 }
00049
00053 BundleRef
00054 BundleList::front()
00055 {
00056 oasys::ScopeLock l(lock_, "BundleList::front");
00057
00058 BundleRef ret("BundleList::front() temporary");
00059 if (list_.empty())
00060 return ret;
00061
00062 ret = list_.front();
00063 return ret;
00064 }
00065
00069 BundleRef
00070 BundleList::back()
00071 {
00072 oasys::ScopeLock l(lock_, "BundleList::back");
00073
00074 BundleRef ret("BundleList::back() temporary");
00075 if (list_.empty())
00076 return ret;
00077
00078 ret = list_.back();
00079 return ret;
00080 }
00081
00086 void
00087 BundleList::add_bundle(Bundle* b, const iterator& pos)
00088 {
00089 ASSERT(lock_->is_locked_by_me());
00090 ASSERT(b->lock_.is_locked_by_me());
00091
00092 if (b->mappings_.insert(this).second == false) {
00093 log_err("ERROR in add bundle: "
00094 "bundle id %d already on list [%s]",
00095 b->bundleid_, name_.c_str());
00096
00097 return;
00098 }
00099
00100 list_.insert(pos, b);
00101 b->add_ref("bundle_list", name_.c_str());
00102 if (notifier_ != 0) {
00103 notifier_->notify();
00104 }
00105
00106 log_debug("bundle id %d add mapping [%s] to list %p",
00107 b->bundleid_, name_.c_str(), this);
00108 }
00109
00113 void
00114 BundleList::push_front(Bundle* b)
00115 {
00116 oasys::ScopeLock l(lock_, "BundleList::push_front");
00117 oasys::ScopeLock bl(&b->lock_, "BundleList::push_front");
00118 add_bundle(b, list_.begin());
00119 }
00120
00124 void
00125 BundleList::push_back(Bundle* b)
00126 {
00127 oasys::ScopeLock l(lock_, "BundleList::push_back");
00128 oasys::ScopeLock bl(&b->lock_, "BundleList::push_back");
00129 add_bundle(b, list_.end());
00130 }
00131
00135 void
00136 BundleList::insert_sorted(Bundle* b, sort_order_t sort_order)
00137 {
00138 iterator iter;
00139 oasys::ScopeLock l(lock_, "BundleList::insert_sorted");
00140 oasys::ScopeLock bl(&b->lock_, "BundleList::insert_sorted");
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151 for (iter = list_.begin(); iter != list_.end(); ++iter)
00152 {
00153 if (sort_order == SORT_FRAG_OFFSET) {
00154 if ((*iter)->frag_offset_ > b->frag_offset_) {
00155 break;
00156 }
00157
00158 } else if (sort_order == SORT_PRIORITY) {
00159 NOTIMPLEMENTED;
00160
00161 } else {
00162 PANIC("invalid value for sort order %d", sort_order);
00163 }
00164 }
00165
00166 add_bundle(b, iter);
00167 }
00168
00173 void
00174 BundleList::insert_random(Bundle* b)
00175 {
00176 iterator iter;
00177 oasys::ScopeLock l(lock_, "BundleList::insert_random");
00178 oasys::ScopeLock bl(&b->lock_, "BundleList::insert_random");
00179
00180 iter = begin();
00181 int location = 0;
00182 if (size() != 0) {
00183 location = random() % size();
00184 }
00185
00186 log_info("insert_random at %d/%zu", location, size());
00187
00188 for (int i = 0; i < location; ++i) {
00189 ++iter;
00190 }
00191
00192 add_bundle(b, iter);
00193 }
00194
00198 Bundle*
00199 BundleList::del_bundle(const iterator& pos, bool used_notifier)
00200 {
00201 Bundle* b = *pos;
00202 ASSERT(lock_->is_locked_by_me());
00203
00204
00205 oasys::ScopeLock l(& b->lock_, "BundleList::del_bundle");
00206
00207
00208 log_debug("bundle id %d del_bundle: deleting mapping [%s]",
00209 b->bundleid_, name_.c_str());
00210
00211 Bundle::BundleMappings::iterator mapping_pos = b->mappings_.find(this);
00212 if (mapping_pos == b->mappings_.end()) {
00213 log_err("ERROR in del bundle: "
00214 "bundle id %d not on list [%s]",
00215 b->bundleid_, name_.c_str());
00216 } else {
00217 b->mappings_.erase(mapping_pos);
00218 }
00219
00220
00221 list_.erase(pos);
00222
00223
00224 if (notifier_ && !used_notifier) {
00225 notifier_->drain_pipe(1);
00226 }
00227
00228
00229
00230
00231 return b;
00232 }
00233
00237 BundleRef
00238 BundleList::pop_front(bool used_notifier)
00239 {
00240 oasys::ScopeLock l(lock_, "pop_front");
00241
00242 BundleRef ret("BundleList::pop_front() temporary");
00243
00244 if (list_.empty()) {
00245 return ret;
00246 }
00247
00248 ASSERT(list_.size() != 0);
00249
00250
00251
00252 ret = del_bundle(list_.begin(), used_notifier);
00253 ret.object()->del_ref("bundle_list", name_.c_str());
00254 return ret;
00255 }
00256
00260 BundleRef
00261 BundleList::pop_back(bool used_notifier)
00262 {
00263 oasys::ScopeLock l(lock_, "BundleList::pop_back");
00264
00265 BundleRef ret("BundleList::pop_back() temporary");
00266
00267 if (list_.empty()) {
00268 return ret;
00269 }
00270
00271
00272
00273 ret = del_bundle(--list_.end(), used_notifier);
00274 ret->del_ref("bundle_list", name_.c_str());
00275 return ret;
00276 }
00277
00285 bool
00286 BundleList::erase(Bundle* bundle, bool used_notifier)
00287 {
00288 if (bundle == NULL) {
00289 return false;
00290 }
00291
00292 ASSERTF(!bundle->lock_.is_locked_by_me(),
00293 "bundle cannot be locked in erase due to potential deadlock");
00294
00295 oasys::ScopeLock l(lock_, "BundleList::erase");
00296
00297 iterator pos = std::find(begin(), end(), bundle);
00298 if (pos == end()) {
00299 return false;
00300 }
00301
00302 del_bundle(pos, used_notifier);
00303
00304 bundle->del_ref("bundle_list", name_.c_str());
00305 return true;
00306 }
00307
00313 bool
00314 BundleList::contains(Bundle* bundle)
00315 {
00316 if (bundle == NULL) {
00317 return false;
00318 }
00319
00320 bool ret = bundle->is_queued_on(this);
00321
00322 #define DEBUG_MAPPINGS
00323 #ifdef DEBUG_MAPPINGS
00324 oasys::ScopeLock l(lock_, "BundleList::contains");
00325 bool ret2 = (std::find(begin(), end(), bundle) != end());
00326 ASSERT(ret == ret2);
00327 #endif
00328
00329 return ret;
00330 }
00331
00337 BundleRef
00338 BundleList::find(u_int32_t bundle_id)
00339 {
00340 oasys::ScopeLock l(lock_, "BundleList::find");
00341 BundleRef ret("BundleList::find() temporary");
00342 for (iterator iter = begin(); iter != end(); ++iter) {
00343 if ((*iter)->bundleid_ == bundle_id) {
00344 ret = *iter;
00345 return ret;
00346 }
00347 }
00348
00349 return ret;
00350 }
00351
00358 BundleRef
00359 BundleList::find(const EndpointID& source_eid,
00360 const BundleTimestamp& creation_ts)
00361 {
00362 oasys::ScopeLock l(lock_, "BundleList::find");
00363 BundleRef ret("BundleList::find() temporary");
00364
00365 for (iterator iter = begin(); iter != end(); ++iter) {
00366 if ((*iter)->source_.equals(source_eid) &&
00367 (*iter)->creation_ts_.seconds_ == creation_ts.seconds_ &&
00368 (*iter)->creation_ts_.seqno_ == creation_ts.seqno_)
00369 {
00370 ret = *iter;
00371 return ret;
00372 }
00373 }
00374
00375 return ret;
00376 }
00377
00381 void
00382 BundleList::move_contents(BundleList* other)
00383 {
00384 oasys::ScopeLock l1(lock_, "BundleList::move_contents");
00385 oasys::ScopeLock l2(other->lock_, "BundleList::move_contents");
00386
00387 BundleRef b("BundleList::move_contents temporary");
00388 while (!list_.empty()) {
00389 b = pop_front();
00390 other->push_back(b.object());
00391 }
00392 }
00393
00397 void
00398 BundleList::clear()
00399 {
00400 oasys::ScopeLock l(lock_, "BundleList::clear");
00401
00402 while (!list_.empty()) {
00403 BundleRef b("BundleList::clear temporary");
00404 b = pop_front();
00405 }
00406 }
00407
00408
00412 size_t
00413 BundleList::size() const
00414 {
00415 oasys::ScopeLock l(lock_, "BundleList::size");
00416 return list_.size();
00417 }
00418
00424 BundleList::iterator
00425 BundleList::begin()
00426 {
00427 if (!lock_->is_locked_by_me())
00428 PANIC("Must lock BundleList before using iterator");
00429
00430 return list_.begin();
00431 }
00432
00438 BundleList::iterator
00439 BundleList::end()
00440 {
00441 if (!lock_->is_locked_by_me())
00442 PANIC("Must lock BundleList before using iterator");
00443
00444 return list_.end();
00445 }
00446
00452 BundleList::const_iterator
00453 BundleList::begin() const
00454 {
00455 if (!lock_->is_locked_by_me())
00456 PANIC("Must lock BundleList before using iterator");
00457
00458 return list_.begin();
00459 }
00460
00461
00467 BundleList::const_iterator
00468 BundleList::end() const
00469 {
00470 if (!lock_->is_locked_by_me())
00471 PANIC("Must lock BundleList before using iterator");
00472
00473 return list_.end();
00474 }
00475
00476 BlockingBundleList::BlockingBundleList(const std::string& name)
00477 : BundleList(name)
00478 {
00479 notifier_ = new oasys::Notifier(logpath_);
00480 }
00481
00482 BlockingBundleList::~BlockingBundleList()
00483 {
00484 delete notifier_;
00485 notifier_ = NULL;
00486 }
00487
00495 BundleRef
00496 BlockingBundleList::pop_blocking(int timeout)
00497 {
00498 ASSERT(notifier_);
00499
00500 log_debug("pop_blocking on list %p [%s]",
00501 this, name_.c_str());
00502
00503 lock_->lock("BlockingBundleList::pop_blocking");
00504
00505 bool used_wait;
00506 if (list_.empty()) {
00507 used_wait = true;
00508 bool notified = notifier_->wait(lock_, timeout);
00509 ASSERT(lock_->is_locked_by_me());
00510
00511
00512
00513 if (!notified) {
00514 lock_->unlock();
00515 log_debug("pop_blocking timeout on list %p", this);
00516
00517 return BundleRef("BlockingBundleList::pop_blocking temporary");
00518 }
00519 } else {
00520 used_wait = false;
00521 }
00522
00523
00524
00525 ASSERT(!list_.empty());
00526
00527 BundleRef ret("BlockingBundleList::pop_blocking temporary");
00528 ret = pop_front(used_wait);
00529
00530 lock_->unlock();
00531
00532 log_debug("pop_blocking got bundle %p from list %p",
00533 ret.object(), this);
00534
00535 return ret;
00536 }
00537
00538
00539 }