BundleList.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <algorithm>
00019 #include <stdlib.h>
00020 #include <oasys/thread/SpinLock.h>
00021 
00022 #include "Bundle.h"
00023 #include "BundleList.h"
00024 #include "BundleTimestamp.h"
00025 
00026 namespace dtn {
00027 
00028 // XXX/demmer want some sort of expiration handler registration per
00029 // list so things know when their bundles have been expired
00030 
00031 BundleList::BundleList(const std::string& name)
00032     : Logger("BundleList", "/dtn/bundle/list/%s", name.c_str()),
00033       name_(name), lock_(new oasys::SpinLock()), notifier_(NULL)
00034 {
00035 }
00036 
00037 void
00038 BundleList::set_name(const std::string& name)
00039 {
00040     name_ = name;
00041     logpathf("/dtn/bundle/list/%s", name.c_str());
00042 }
00043 
00044 BundleList::~BundleList()
00045 {
00046     clear();
00047     delete lock_;
00048 }
00049 
00053 BundleRef
00054 BundleList::front()
00055 {
00056     oasys::ScopeLock l(lock_, "BundleList::front");
00057     
00058     BundleRef ret("BundleList::front() temporary");
00059     if (list_.empty())
00060         return ret;
00061 
00062     ret = list_.front();
00063     return ret;
00064 }
00065 
00069 BundleRef
00070 BundleList::back()
00071 {
00072     oasys::ScopeLock l(lock_, "BundleList::back");
00073 
00074     BundleRef ret("BundleList::back() temporary");
00075     if (list_.empty())
00076         return ret;
00077 
00078     ret = list_.back();
00079     return ret;
00080 }
00081 
00086 void
00087 BundleList::add_bundle(Bundle* b, const iterator& pos)
00088 {
00089     ASSERT(lock_->is_locked_by_me());
00090     ASSERT(b->lock_.is_locked_by_me());
00091 
00092     if (b->mappings_.insert(this).second == false) {
00093         log_err("ERROR in add bundle: "
00094                 "bundle id %d already on list [%s]",
00095                 b->bundleid_, name_.c_str());
00096 
00097         return;
00098     }
00099 
00100     list_.insert(pos, b);
00101     b->add_ref("bundle_list", name_.c_str()); 
00102     if (notifier_ != 0) {
00103         notifier_->notify();
00104     }
00105 
00106     log_debug("bundle id %d add mapping [%s] to list %p",
00107               b->bundleid_, name_.c_str(), this);
00108 }
00109 
00113 void
00114 BundleList::push_front(Bundle* b)
00115 {
00116     oasys::ScopeLock l(lock_, "BundleList::push_front");
00117     oasys::ScopeLock bl(&b->lock_, "BundleList::push_front");
00118     add_bundle(b, list_.begin());
00119 }
00120 
00124 void
00125 BundleList::push_back(Bundle* b)
00126 {
00127     oasys::ScopeLock l(lock_, "BundleList::push_back");
00128     oasys::ScopeLock bl(&b->lock_, "BundleList::push_back");
00129     add_bundle(b, list_.end());
00130 }
00131         
00135 void
00136 BundleList::insert_sorted(Bundle* b, sort_order_t sort_order)
00137 {
00138     iterator iter;
00139     oasys::ScopeLock l(lock_, "BundleList::insert_sorted");
00140     oasys::ScopeLock bl(&b->lock_, "BundleList::insert_sorted");
00141 
00142     // scan through the list until the iterator either a) reaches the
00143     // end of the list or b) reaches the bundle that should follow the
00144     // new insertion in the list. once the loop is done therefore, the
00145     // insert() call will then always put the bundle in the right
00146     // place
00147     //
00148     // XXX/demmer there's probably a more stl-ish way to do this but i
00149     // don't know what it is 
00150     
00151     for (iter = list_.begin(); iter != list_.end(); ++iter)
00152     {
00153         if (sort_order == SORT_FRAG_OFFSET) {
00154             if ((*iter)->frag_offset_ > b->frag_offset_) {
00155                 break;
00156             }
00157 
00158         } else if (sort_order == SORT_PRIORITY) {
00159             NOTIMPLEMENTED;
00160             
00161         } else {
00162             PANIC("invalid value for sort order %d", sort_order);
00163         }
00164     }
00165     
00166     add_bundle(b, iter);
00167 }
00168 
00173 void
00174 BundleList::insert_random(Bundle* b)
00175 {
00176     iterator iter;
00177     oasys::ScopeLock l(lock_, "BundleList::insert_random");
00178     oasys::ScopeLock bl(&b->lock_, "BundleList::insert_random");
00179 
00180     iter = begin();
00181     int location = 0;
00182     if (size() != 0) {
00183         location = random() % size();
00184     }
00185 
00186     log_info("insert_random at %d/%zu", location, size());
00187     
00188     for (int i = 0; i < location; ++i) {
00189         ++iter;
00190     }
00191 
00192     add_bundle(b, iter);
00193 }
00194 
00198 Bundle*
00199 BundleList::del_bundle(const iterator& pos, bool used_notifier)
00200 {
00201     Bundle* b = *pos;
00202     ASSERT(lock_->is_locked_by_me());
00203     
00204     // lock the bundle
00205     oasys::ScopeLock l(& b->lock_, "BundleList::del_bundle");
00206 
00207     // remove the mapping
00208     log_debug("bundle id %d del_bundle: deleting mapping [%s]",
00209               b->bundleid_, name_.c_str());
00210     
00211     Bundle::BundleMappings::iterator mapping_pos = b->mappings_.find(this);
00212     if (mapping_pos == b->mappings_.end()) {
00213         log_err("ERROR in del bundle: "
00214                 "bundle id %d not on list [%s]",
00215                 b->bundleid_, name_.c_str());
00216     } else {
00217         b->mappings_.erase(mapping_pos);
00218     }
00219 
00220     // remove the bundle from the list
00221     list_.erase(pos);
00222     
00223     // drain one element from the semaphore
00224     if (notifier_ && !used_notifier) {
00225         notifier_->drain_pipe(1);
00226     }
00227 
00228     // note that we explicitly do _not_ decrement the reference count
00229     // since the reference is passed to the calling function
00230     
00231     return b;
00232 }
00233 
00237 BundleRef
00238 BundleList::pop_front(bool used_notifier)
00239 {
00240     oasys::ScopeLock l(lock_, "pop_front");
00241 
00242     BundleRef ret("BundleList::pop_front() temporary");
00243 
00244     if (list_.empty()) {
00245         return ret;
00246     }
00247     
00248     ASSERT(list_.size() != 0);
00249 
00250     // Assign the bundle to a temporary reference, then remove the
00251     // list reference on the bundle and return the temporary
00252     ret = del_bundle(list_.begin(), used_notifier);
00253     ret.object()->del_ref("bundle_list", name_.c_str()); 
00254     return ret;
00255 }
00256 
00260 BundleRef
00261 BundleList::pop_back(bool used_notifier)
00262 {
00263     oasys::ScopeLock l(lock_, "BundleList::pop_back");
00264 
00265     BundleRef ret("BundleList::pop_back() temporary");
00266 
00267     if (list_.empty()) {
00268         return ret;
00269     }
00270 
00271     // Assign the bundle to a temporary reference, then remove the
00272     // list reference on the bundle and return the temporary
00273     ret = del_bundle(--list_.end(), used_notifier);
00274     ret->del_ref("bundle_list", name_.c_str()); 
00275     return ret;
00276 }
00277 
00285 bool
00286 BundleList::erase(Bundle* bundle, bool used_notifier)
00287 {
00288     if (bundle == NULL) {
00289         return false;
00290     }
00291 
00292     ASSERTF(!bundle->lock_.is_locked_by_me(),
00293             "bundle cannot be locked in erase due to potential deadlock");
00294     
00295     oasys::ScopeLock l(lock_, "BundleList::erase");
00296 
00297     iterator pos = std::find(begin(), end(), bundle);
00298     if (pos == end()) {
00299         return false;
00300     }
00301 
00302     del_bundle(pos, used_notifier);
00303     
00304     bundle->del_ref("bundle_list", name_.c_str());
00305     return true;
00306 }
00307 
00313 bool
00314 BundleList::contains(Bundle* bundle)
00315 {
00316     if (bundle == NULL) {
00317         return false;
00318     }
00319     
00320     bool ret = bundle->is_queued_on(this);
00321 
00322 #define DEBUG_MAPPINGS
00323 #ifdef DEBUG_MAPPINGS
00324     oasys::ScopeLock l(lock_, "BundleList::contains");
00325     bool ret2 = (std::find(begin(), end(), bundle) != end());
00326     ASSERT(ret == ret2);
00327 #endif
00328 
00329     return ret;
00330 }
00331 
00337 BundleRef
00338 BundleList::find(u_int32_t bundle_id)
00339 {
00340     oasys::ScopeLock l(lock_, "BundleList::find");
00341     BundleRef ret("BundleList::find() temporary");
00342     for (iterator iter = begin(); iter != end(); ++iter) {
00343         if ((*iter)->bundleid_ == bundle_id) {
00344             ret = *iter;
00345             return ret;
00346         }
00347     }
00348 
00349     return ret;
00350 }
00351 
00358 BundleRef
00359 BundleList::find(const EndpointID& source_eid,
00360                  const BundleTimestamp& creation_ts)
00361 {
00362     oasys::ScopeLock l(lock_, "BundleList::find");
00363     BundleRef ret("BundleList::find() temporary");
00364     
00365     for (iterator iter = begin(); iter != end(); ++iter) {
00366         if ((*iter)->source_.equals(source_eid) &&
00367             (*iter)->creation_ts_.seconds_ == creation_ts.seconds_ &&
00368             (*iter)->creation_ts_.seqno_ == creation_ts.seqno_)
00369         {
00370             ret = *iter;
00371             return ret;
00372         }
00373     }
00374 
00375     return ret;
00376 }
00377 
00381 void
00382 BundleList::move_contents(BundleList* other)
00383 {
00384     oasys::ScopeLock l1(lock_, "BundleList::move_contents");
00385     oasys::ScopeLock l2(other->lock_, "BundleList::move_contents");
00386 
00387     BundleRef b("BundleList::move_contents temporary");
00388     while (!list_.empty()) {
00389         b = pop_front();
00390         other->push_back(b.object());
00391     }
00392 }
00393 
00397 void
00398 BundleList::clear()
00399 {
00400     oasys::ScopeLock l(lock_, "BundleList::clear");
00401     
00402     while (!list_.empty()) {
00403         BundleRef b("BundleList::clear temporary");
00404         b = pop_front();
00405     }
00406 }
00407 
00408 
00412 size_t
00413 BundleList::size() const
00414 {
00415     oasys::ScopeLock l(lock_, "BundleList::size");
00416     return list_.size();
00417 }
00418 
00424 BundleList::iterator
00425 BundleList::begin()
00426 {
00427     if (!lock_->is_locked_by_me())
00428         PANIC("Must lock BundleList before using iterator");
00429     
00430     return list_.begin();
00431 }
00432 
00438 BundleList::iterator
00439 BundleList::end()
00440 {
00441     if (!lock_->is_locked_by_me())
00442         PANIC("Must lock BundleList before using iterator");
00443     
00444     return list_.end();
00445 }
00446 
00452 BundleList::const_iterator
00453 BundleList::begin() const
00454 {
00455     if (!lock_->is_locked_by_me())
00456         PANIC("Must lock BundleList before using iterator");
00457     
00458     return list_.begin();
00459 }
00460 
00461 
00467 BundleList::const_iterator
00468 BundleList::end() const
00469 {
00470     if (!lock_->is_locked_by_me())
00471         PANIC("Must lock BundleList before using iterator");
00472     
00473     return list_.end();
00474 }
00475 
00476 BlockingBundleList::BlockingBundleList(const std::string& name)
00477     : BundleList(name)
00478 {
00479     notifier_ = new oasys::Notifier(logpath_);
00480 }
00481 
00482 BlockingBundleList::~BlockingBundleList()
00483 {
00484     delete notifier_;
00485     notifier_ = NULL;
00486 }
00487 
00495 BundleRef
00496 BlockingBundleList::pop_blocking(int timeout)
00497 {
00498     ASSERT(notifier_);
00499 
00500     log_debug("pop_blocking on list %p [%s]", 
00501               this, name_.c_str());
00502     
00503     lock_->lock("BlockingBundleList::pop_blocking");
00504 
00505     bool used_wait;
00506     if (list_.empty()) {
00507         used_wait = true;
00508         bool notified = notifier_->wait(lock_, timeout);
00509         ASSERT(lock_->is_locked_by_me());
00510 
00511         // if the timeout occurred, wait returns false, so there's
00512         // still nothing on the list
00513         if (!notified) {
00514             lock_->unlock();
00515             log_debug("pop_blocking timeout on list %p", this);
00516 
00517             return BundleRef("BlockingBundleList::pop_blocking temporary");
00518         }
00519     } else {
00520         used_wait = false;
00521     }
00522     
00523     // This can't be empty if we got notified, unless there is another
00524     // thread waiting on the queue - which is an error.
00525     ASSERT(!list_.empty());
00526     
00527     BundleRef ret("BlockingBundleList::pop_blocking temporary");
00528     ret = pop_front(used_wait);
00529     
00530     lock_->unlock();
00531 
00532     log_debug("pop_blocking got bundle %p from list %p", 
00533               ret.object(), this);
00534     
00535     return ret;
00536 }
00537 
00538 
00539 } // namespace dtn

Generated on Thu Jun 7 16:56:48 2007 for DTN Reference Implementation by  doxygen 1.5.1