00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <oasys/debug/DebugUtils.h>
00040 #include <oasys/thread/SpinLock.h>
00041
00042 #include "Bundle.h"
00043 #include "BundleDaemon.h"
00044 #include "BundleList.h"
00045 #include "ExpirationTimer.h"
00046
00047 #include "storage/GlobalStore.h"
00048
00049 namespace dtn {
00050
00051
00052 void
00053 Bundle::init(u_int32_t id)
00054 {
00055 bundleid_ = id;
00056 is_fragment_ = false;
00057 is_admin_ = false;
00058 do_not_fragment_ = false;
00059 in_datastore_ = false;
00060 custody_requested_ = false;
00061 local_custody_ = false;
00062 singleton_dest_ = true;
00063 priority_ = COS_NORMAL;
00064 receive_rcpt_ = false;
00065 custody_rcpt_ = false;
00066 forward_rcpt_ = false;
00067 delivery_rcpt_ = false;
00068 deletion_rcpt_ = false;
00069 app_acked_rcpt_ = false;
00070 orig_length_ = 0;
00071 frag_offset_ = 0;
00072 expiration_ = 0;
00073 owner_ = "";
00074
00075
00076
00077
00078 creation_ts_.seconds_ = BundleTimestamp::get_current_time();
00079 creation_ts_.seqno_ = bundleid_;
00080
00081 log_debug("/dtn/bundle", "Bundle::init bundle id %d", id);
00082 }
00083
00084
00085 Bundle::Bundle()
00086 : payload_(&lock_), fwdlog_(&lock_)
00087 {
00088 u_int32_t id = GlobalStore::instance()->next_bundleid();
00089 init(id);
00090 payload_.init(id);
00091 refcount_ = 0;
00092 expiration_timer_ = NULL;
00093 freed_ = false;
00094 }
00095
00096
00097 Bundle::Bundle(const oasys::Builder&)
00098 : payload_(&lock_), fwdlog_(&lock_)
00099 {
00100
00101
00102
00103
00104 refcount_ = 0;
00105 bundleid_ = 0xffffffff;
00106 expiration_timer_ = NULL;
00107 freed_ = false;
00108 }
00109
00110
00111 Bundle::~Bundle()
00112 {
00113 log_debug("/dtn/bundle/free", "destroying bundle id %d", bundleid_);
00114
00115 ASSERT(mappings_.size() == 0);
00116 bundleid_ = 0xdeadf00d;
00117
00118 ASSERTF(expiration_timer_ == NULL,
00119 "bundle deleted with pending expiration timer");
00120
00121 }
00122
00123
00124 int
00125 Bundle::format(char* buf, size_t sz) const
00126 {
00127 return snprintf(buf, sz, "bundle id %d %s -> %s (%s%s%d bytes payload)",
00128 bundleid_, source_.c_str(), dest_.c_str(),
00129 is_admin_ ? "is_admin " : "",
00130 is_fragment_ ? "is_fragment " : "",
00131 (u_int32_t)payload_.length());
00132 }
00133
00134
00135 void
00136 Bundle::format_verbose(oasys::StringBuffer* buf)
00137 {
00138
00139 #define bool_to_str(x) ((x) ? "true" : "false")
00140
00141 buf->appendf("bundle id %d:\n", bundleid_);
00142 buf->appendf(" source: %s\n", source_.c_str());
00143 buf->appendf(" dest: %s\n", dest_.c_str());
00144 buf->appendf(" custodian: %s\n", custodian_.c_str());
00145 buf->appendf(" replyto: %s\n", replyto_.c_str());
00146 buf->appendf(" payload_length: %zu\n", payload_.length());
00147 buf->appendf(" priority: %d\n", priority_);
00148 buf->appendf(" custody_requested: %s\n", bool_to_str(custody_requested_));
00149 buf->appendf(" local_custody: %s\n", bool_to_str(local_custody_));
00150 buf->appendf(" singleton_dest: %s\n", bool_to_str(singleton_dest_));
00151 buf->appendf(" receive_rcpt: %s\n", bool_to_str(receive_rcpt_));
00152 buf->appendf(" custody_rcpt: %s\n", bool_to_str(custody_rcpt_));
00153 buf->appendf(" forward_rcpt: %s\n", bool_to_str(forward_rcpt_));
00154 buf->appendf(" delivery_rcpt: %s\n", bool_to_str(delivery_rcpt_));
00155 buf->appendf(" deletion_rcpt: %s\n", bool_to_str(deletion_rcpt_));
00156 buf->appendf(" app_acked_rcpt: %s\n", bool_to_str(app_acked_rcpt_));
00157 buf->appendf(" creation_ts: %u.%u\n",
00158 creation_ts_.seconds_, creation_ts_.seqno_);
00159 buf->appendf(" expiration: %d\n", expiration_);
00160 buf->appendf(" is_fragment: %s\n", bool_to_str(is_fragment_));
00161 buf->appendf(" is_admin: %s\n", bool_to_str(is_admin_));
00162 buf->appendf(" do_not_fragment: %s\n", bool_to_str(do_not_fragment_));
00163 buf->appendf(" orig_length: %d\n", orig_length_);
00164 buf->appendf(" frag_offset: %d\n", frag_offset_);
00165 buf->appendf("transmission_count: %zu\n", fwdlog_.get_transmission_count());
00166 }
00167
00168
00169 void
00170 Bundle::serialize(oasys::SerializeAction* a)
00171 {
00172 a->process("bundleid", &bundleid_);
00173 a->process("is_fragment", &is_fragment_);
00174 a->process("is_admin", &is_admin_);
00175 a->process("do_not_fragment", &do_not_fragment_);
00176 a->process("source", &source_);
00177 a->process("dest", &dest_);
00178 a->process("custodian", &custodian_);
00179 a->process("replyto", &replyto_);
00180 a->process("priority", &priority_);
00181 a->process("custody_requested", &custody_requested_);
00182 a->process("local_custody", &local_custody_);
00183 a->process("singleton_dest", &singleton_dest_);
00184 a->process("custody_rcpt", &custody_rcpt_);
00185 a->process("receive_rcpt", &receive_rcpt_);
00186 a->process("forward_rcpt", &forward_rcpt_);
00187 a->process("delivery_rcpt", &delivery_rcpt_);
00188 a->process("deletion_rcpt", &deletion_rcpt_);
00189 a->process("app_acked_rcpt", &app_acked_rcpt_);
00190 a->process("creation_ts_seconds", &creation_ts_.seconds_);
00191 a->process("creation_ts_seqno", &creation_ts_.seqno_);
00192 a->process("expiration", &expiration_);
00193 a->process("payload", &payload_);
00194 a->process("orig_length", &orig_length_);
00195 a->process("frag_offset", &frag_offset_);
00196 a->process("owner", &owner_);
00197
00198
00199
00200
00201
00202 if (a->action_code() == oasys::Serialize::UNMARSHAL) {
00203 in_datastore_ = true;
00204 payload_.init_from_store(bundleid_);
00205 }
00206 }
00207
00208
00209 void
00210 Bundle::copy_metadata(Bundle* new_bundle)
00211 {
00212 new_bundle->is_admin_ = is_admin_;
00213 new_bundle->is_fragment_ = is_fragment_;
00214 new_bundle->do_not_fragment_ = do_not_fragment_;
00215 new_bundle->source_ = source_;
00216 new_bundle->dest_ = dest_;
00217 new_bundle->custodian_ = custodian_;
00218 new_bundle->replyto_ = replyto_;
00219 new_bundle->priority_ = priority_;
00220 new_bundle->custody_requested_ = custody_requested_;
00221 new_bundle->local_custody_ = false;
00222 new_bundle->singleton_dest_ = singleton_dest_;
00223 new_bundle->custody_rcpt_ = custody_rcpt_;
00224 new_bundle->receive_rcpt_ = receive_rcpt_;
00225 new_bundle->forward_rcpt_ = forward_rcpt_;
00226 new_bundle->delivery_rcpt_ = delivery_rcpt_;
00227 new_bundle->deletion_rcpt_ = deletion_rcpt_;
00228 new_bundle->app_acked_rcpt_ = app_acked_rcpt_;
00229 new_bundle->creation_ts_ = creation_ts_;
00230 new_bundle->expiration_ = expiration_;
00231 }
00232
00233
00234 int
00235 Bundle::add_ref(const char* what1, const char* what2)
00236 {
00237 (void)what1;
00238 (void)what2;
00239
00240 oasys::ScopeLock l(&lock_, "Bundle::add_ref");
00241
00242 ASSERTF(freed_ == false, "Bundle::add_ref on bundle %d (%p)"
00243 "called when bundle is already being freed!", bundleid_, this);
00244
00245 ASSERT(refcount_ >= 0);
00246 int ret = ++refcount_;
00247 log_debug("/dtn/bundle/refs",
00248 "bundle id %d (%p): refcount %d -> %d (%zu mappings) add %s %s",
00249 bundleid_, this, refcount_ - 1, refcount_,
00250 mappings_.size(), what1, what2);
00251 return ret;
00252 }
00253
00254
00255 int
00256 Bundle::del_ref(const char* what1, const char* what2)
00257 {
00258 (void)what1;
00259 (void)what2;
00260
00261 oasys::ScopeLock l(&lock_, "Bundle::del_ref");
00262
00263 ASSERTF(freed_ == false, "Bundle::del_ref on bundle %d (%p)"
00264 "called when bundle is already being freed!", bundleid_, this);
00265
00266 int ret = --refcount_;
00267 log_debug("/dtn/bundle/refs",
00268 "bundle id %d (%p): refcount %d -> %d (%zu mappings) del %s %s",
00269 bundleid_, this, refcount_ + 1, refcount_,
00270 mappings_.size(), what1, what2);
00271
00272 if (refcount_ != 0) {
00273 return ret;
00274 }
00275
00276 freed_ = true;
00277
00278 log_debug("/dtn/bundle",
00279 "bundle id %d (%p): no more references, posting free event",
00280 bundleid_, this);
00281
00282 BundleDaemon::instance()->post(new BundleFreeEvent(this));
00283
00284 return 0;
00285 }
00286
00287
00288 Bundle::MappingsIterator
00289 Bundle::mappings_begin()
00290 {
00291 if (!lock_.is_locked_by_me())
00292 PANIC("Must lock Bundle before using mappings iterator");
00293
00294 return mappings_.begin();
00295 }
00296
00297
00298 Bundle::MappingsIterator
00299 Bundle::mappings_end()
00300 {
00301 if (!lock_.is_locked_by_me())
00302 PANIC("Must lock Bundle before using mappings iterator");
00303
00304 return mappings_.end();
00305 }
00306
00307
00308 bool
00309 Bundle::is_queued_on(BundleList* bundle_list)
00310 {
00311 oasys::ScopeLock l(&lock_, "Bundle::is_queued_on");
00312 return (mappings_.count(bundle_list) > 0);
00313 }
00314
00315
00316 bool
00317 Bundle::validate(oasys::StringBuffer* errbuf)
00318 {
00319 if (!source_.valid()) {
00320 errbuf->appendf("invalid source eid [%s]", source_.c_str());
00321 return false;
00322 }
00323
00324 if (!dest_.valid()) {
00325 errbuf->appendf("invalid dest eid [%s]", dest_.c_str());
00326 return false;
00327 }
00328
00329 if (!replyto_.valid()) {
00330 errbuf->appendf("invalid replyto eid [%s]", replyto_.c_str());
00331 return false;
00332 }
00333
00334 if (!custodian_.valid()) {
00335 errbuf->appendf("invalid custodian eid [%s]", custodian_.c_str());
00336 return false;
00337 }
00338
00339 return true;
00340
00341 }
00342
00343 }