Bundle.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <oasys/debug/DebugUtils.h>
00019 #include <oasys/thread/SpinLock.h>
00020 
00021 #include "Bundle.h"
00022 #include "BundleDaemon.h"
00023 #include "BundleList.h"
00024 #include "ExpirationTimer.h"
00025 
00026 #include "storage/GlobalStore.h"
00027 
00028 namespace dtn {
00029 
00030 //----------------------------------------------------------------------
00031 void
00032 Bundle::init(u_int32_t id)
00033 {
00034     bundleid_           = id;
00035     is_fragment_        = false;
00036     is_admin_           = false;
00037     do_not_fragment_    = false;
00038     in_datastore_       = false;
00039     custody_requested_  = false;
00040     local_custody_      = false;
00041     singleton_dest_     = true;
00042     priority_           = COS_NORMAL;
00043     receive_rcpt_       = false;
00044     custody_rcpt_       = false;
00045     forward_rcpt_       = false;
00046     delivery_rcpt_      = false;
00047     deletion_rcpt_      = false;
00048     app_acked_rcpt_     = false;
00049     orig_length_        = 0;
00050     frag_offset_        = 0;
00051     expiration_         = 0;
00052     owner_              = "";
00053 
00054     // as per the spec, the creation timestamp should be calculated as
00055     // seconds since 1/1/1970, and since the bundle id should be
00056     // monotonically increasing, it's safe to use that for the seqno
00057     creation_ts_.seconds_ = BundleTimestamp::get_current_time();
00058     creation_ts_.seqno_   = bundleid_;
00059 
00060     log_debug_p("/dtn/bundle", "Bundle::init bundle id %d", id);
00061 }
00062 
00063 //----------------------------------------------------------------------
00064 Bundle::Bundle(BundlePayload::location_t location)
00065     : payload_(&lock_), fwdlog_(&lock_)
00066 {
00067     u_int32_t id = GlobalStore::instance()->next_bundleid();
00068     init(id);
00069     payload_.init(id, location);
00070     refcount_         = 0;
00071     expiration_timer_ = NULL;
00072     freed_            = false;
00073 }
00074 
00075 //----------------------------------------------------------------------
00076 Bundle::Bundle(const oasys::Builder&)
00077     : payload_(&lock_), fwdlog_(&lock_)
00078 {
00079     // don't do anything here except set the id to a bogus default
00080     // value and make sure the expiration timer is NULL, since the
00081     // fields are set and the payload initialized when loading from
00082     // the database
00083     refcount_         = 0;
00084     bundleid_         = 0xffffffff;
00085     expiration_timer_ = NULL;
00086     freed_            = false;
00087 }
00088 
00089 //----------------------------------------------------------------------
00090 Bundle::~Bundle()
00091 {
00092     log_debug_p("/dtn/bundle/free", "destroying bundle id %d", bundleid_);
00093     
00094     ASSERT(mappings_.size() == 0);
00095     bundleid_ = 0xdeadf00d;
00096 
00097     ASSERTF(expiration_timer_ == NULL,
00098             "bundle deleted with pending expiration timer");
00099 
00100 }
00101 
00102 //----------------------------------------------------------------------
00103 int
00104 Bundle::format(char* buf, size_t sz) const
00105 {
00106     return snprintf(buf, sz, "bundle id %d %s -> %s (%s%s%d bytes payload)",
00107                     bundleid_, source_.c_str(), dest_.c_str(),
00108                     is_admin_    ? "is_admin " : "",
00109                     is_fragment_ ? "is_fragment " : "",
00110                     (u_int32_t)payload_.length());
00111 }
00112 
00113 //----------------------------------------------------------------------
00114 void
00115 Bundle::format_verbose(oasys::StringBuffer* buf)
00116 {
00117 
00118 #define bool_to_str(x)   ((x) ? "true" : "false")
00119 
00120     buf->appendf("bundle id %d:\n", bundleid_);
00121     buf->appendf("            source: %s\n", source_.c_str());
00122     buf->appendf("              dest: %s\n", dest_.c_str());
00123     buf->appendf("         custodian: %s\n", custodian_.c_str());
00124     buf->appendf("           replyto: %s\n", replyto_.c_str());
00125     buf->appendf("           prevhop: %s\n", prevhop_.c_str());
00126     buf->appendf("    payload_length: %zu\n", payload_.length());
00127     buf->appendf("          priority: %d\n", priority_);
00128     buf->appendf(" custody_requested: %s\n", bool_to_str(custody_requested_));
00129     buf->appendf("     local_custody: %s\n", bool_to_str(local_custody_));
00130     buf->appendf("    singleton_dest: %s\n", bool_to_str(singleton_dest_));
00131     buf->appendf("      receive_rcpt: %s\n", bool_to_str(receive_rcpt_));
00132     buf->appendf("      custody_rcpt: %s\n", bool_to_str(custody_rcpt_));
00133     buf->appendf("      forward_rcpt: %s\n", bool_to_str(forward_rcpt_));
00134     buf->appendf("     delivery_rcpt: %s\n", bool_to_str(delivery_rcpt_));
00135     buf->appendf("     deletion_rcpt: %s\n", bool_to_str(deletion_rcpt_));
00136     buf->appendf("    app_acked_rcpt: %s\n", bool_to_str(app_acked_rcpt_));
00137     buf->appendf("       creation_ts: %u.%u\n",
00138                  creation_ts_.seconds_, creation_ts_.seqno_);
00139     buf->appendf("        expiration: %d\n", expiration_);
00140     buf->appendf("       is_fragment: %s\n", bool_to_str(is_fragment_));
00141     buf->appendf("          is_admin: %s\n", bool_to_str(is_admin_));
00142     buf->appendf("   do_not_fragment: %s\n", bool_to_str(do_not_fragment_));
00143     buf->appendf("       orig_length: %d\n", orig_length_);
00144     buf->appendf("       frag_offset: %d\n", frag_offset_);
00145     buf->appendf("transmission_count: %zu\n", fwdlog_.get_transmission_count());
00146 }
00147 
00148 //----------------------------------------------------------------------
00149 void
00150 Bundle::serialize(oasys::SerializeAction* a)
00151 {
00152     a->process("bundleid", &bundleid_);
00153     a->process("is_fragment", &is_fragment_);
00154     a->process("is_admin", &is_admin_);
00155     a->process("do_not_fragment", &do_not_fragment_);
00156     a->process("source", &source_);
00157     a->process("dest", &dest_);
00158     a->process("custodian", &custodian_);
00159     a->process("replyto", &replyto_);
00160     a->process("prevhop", &prevhop_);    
00161     a->process("priority", &priority_);
00162     a->process("custody_requested", &custody_requested_);
00163     a->process("local_custody", &local_custody_);
00164     a->process("singleton_dest", &singleton_dest_);
00165     a->process("custody_rcpt", &custody_rcpt_);
00166     a->process("receive_rcpt", &receive_rcpt_);
00167     a->process("forward_rcpt", &forward_rcpt_);
00168     a->process("delivery_rcpt", &delivery_rcpt_);
00169     a->process("deletion_rcpt", &deletion_rcpt_);
00170     a->process("app_acked_rcpt", &app_acked_rcpt_);
00171     a->process("creation_ts_seconds", &creation_ts_.seconds_);
00172     a->process("creation_ts_seqno", &creation_ts_.seqno_);
00173     a->process("expiration", &expiration_);
00174     a->process("payload", &payload_);
00175     a->process("orig_length", &orig_length_);
00176     a->process("frag_offset", &frag_offset_);
00177     a->process("owner", &owner_);
00178     a->process("recv_blocks", &recv_blocks_);
00179     a->process("api_blocks", &api_blocks_);
00180     
00181     // XXX/TODO serialize the forwarding log and make sure it's
00182     // updated on disk as it changes in memory
00183     //a->process("forwarding_log", &fwdlog_);
00184 
00185     if (a->action_code() == oasys::Serialize::UNMARSHAL) {
00186         in_datastore_ = true;
00187         payload_.init_from_store(bundleid_);
00188     }
00189 }
00190     
00191 //----------------------------------------------------------------------
00192 void
00193 Bundle::copy_metadata(Bundle* new_bundle)
00194 {
00195     new_bundle->is_admin_               = is_admin_;
00196     new_bundle->is_fragment_            = is_fragment_;
00197     new_bundle->do_not_fragment_        = do_not_fragment_;
00198     new_bundle->source_                 = source_;
00199     new_bundle->dest_                   = dest_;
00200     new_bundle->custodian_              = custodian_;
00201     new_bundle->replyto_                = replyto_;
00202     new_bundle->priority_               = priority_;
00203     new_bundle->custody_requested_      = custody_requested_;
00204     new_bundle->local_custody_          = false;
00205     new_bundle->singleton_dest_         = singleton_dest_;
00206     new_bundle->custody_rcpt_           = custody_rcpt_;
00207     new_bundle->receive_rcpt_           = receive_rcpt_;
00208     new_bundle->forward_rcpt_           = forward_rcpt_;
00209     new_bundle->delivery_rcpt_          = delivery_rcpt_;
00210     new_bundle->deletion_rcpt_          = deletion_rcpt_;
00211     new_bundle->app_acked_rcpt_         = app_acked_rcpt_;
00212     new_bundle->creation_ts_            = creation_ts_;
00213     new_bundle->expiration_             = expiration_;
00214 }
00215 
00216 //----------------------------------------------------------------------
00217 int
00218 Bundle::add_ref(const char* what1, const char* what2)
00219 {
00220     (void)what1;
00221     (void)what2;
00222     
00223     oasys::ScopeLock l(&lock_, "Bundle::add_ref");
00224 
00225     ASSERTF(freed_ == false, "Bundle::add_ref on bundle %d (%p)"
00226             "called when bundle is already being freed!", bundleid_, this);
00227     
00228     ASSERT(refcount_ >= 0);
00229     int ret = ++refcount_;
00230     log_debug_p("/dtn/bundle/refs",
00231                 "bundle id %d (%p): refcount %d -> %d (%zu mappings) add %s %s",
00232                 bundleid_, this, refcount_ - 1, refcount_,
00233                 mappings_.size(), what1, what2);
00234     return ret;
00235 }
00236 
00237 //----------------------------------------------------------------------
00238 int
00239 Bundle::del_ref(const char* what1, const char* what2)
00240 {
00241     (void)what1;
00242     (void)what2;
00243     
00244     oasys::ScopeLock l(&lock_, "Bundle::del_ref");
00245 
00246     ASSERTF(freed_ == false, "Bundle::del_ref on bundle %d (%p)"
00247             "called when bundle is already being freed!", bundleid_, this);
00248     
00249     int ret = --refcount_;
00250     log_debug_p("/dtn/bundle/refs",
00251                 "bundle id %d (%p): refcount %d -> %d (%zu mappings) del %s %s",
00252                 bundleid_, this, refcount_ + 1, refcount_,
00253                 mappings_.size(), what1, what2);
00254     
00255     if (refcount_ != 0) {
00256         return ret;
00257     }
00258 
00259     freed_ = true;
00260     
00261     log_debug_p("/dtn/bundle",
00262                 "bundle id %d (%p): no more references, posting free event",
00263                 bundleid_, this);
00264 
00265     BundleDaemon::instance()->post(new BundleFreeEvent(this));
00266     
00267     return 0;
00268 }
00269 
00270 //----------------------------------------------------------------------
00271 Bundle::MappingsIterator
00272 Bundle::mappings_begin()
00273 {
00274     if (!lock_.is_locked_by_me())
00275         PANIC("Must lock Bundle before using mappings iterator");
00276     
00277     return mappings_.begin();
00278 }
00279     
00280 //----------------------------------------------------------------------
00281 Bundle::MappingsIterator
00282 Bundle::mappings_end()
00283 {
00284     if (!lock_.is_locked_by_me())
00285         PANIC("Must lock Bundle before using mappings iterator");
00286     
00287     return mappings_.end();
00288 }
00289 
00290 //----------------------------------------------------------------------
00291 bool
00292 Bundle::is_queued_on(BundleList* bundle_list)
00293 {
00294     oasys::ScopeLock l(&lock_, "Bundle::is_queued_on");
00295     return (mappings_.count(bundle_list) > 0);
00296 }
00297 
00298 //----------------------------------------------------------------------
00299 bool
00300 Bundle::validate(oasys::StringBuffer* errbuf)
00301 {
00302     if (!source_.valid()) {
00303         errbuf->appendf("invalid source eid [%s]", source_.c_str());
00304         return false;
00305     }
00306     
00307     if (!dest_.valid()) {
00308         errbuf->appendf("invalid dest eid [%s]", dest_.c_str());
00309         return false;
00310     }
00311 
00312     if (!replyto_.valid()) {
00313         errbuf->appendf("invalid replyto eid [%s]", replyto_.c_str());
00314         return false;
00315     }
00316 
00317     if (!custodian_.valid()) {
00318         errbuf->appendf("invalid custodian eid [%s]", custodian_.c_str());
00319         return false;
00320     }
00321 
00322     return true;
00323     
00324 }
00325 
00326 } // namespace dtn

Generated on Thu Jun 7 16:56:48 2007 for DTN Reference Implementation by  doxygen 1.5.1