00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <oasys/debug/DebugUtils.h>
00019 #include <oasys/thread/SpinLock.h>
00020
00021 #include "Bundle.h"
00022 #include "BundleDaemon.h"
00023 #include "BundleList.h"
00024 #include "ExpirationTimer.h"
00025
00026 #include "storage/GlobalStore.h"
00027
00028 namespace dtn {
00029
00030
00031 void
00032 Bundle::init(u_int32_t id)
00033 {
00034 bundleid_ = id;
00035 is_fragment_ = false;
00036 is_admin_ = false;
00037 do_not_fragment_ = false;
00038 in_datastore_ = false;
00039 custody_requested_ = false;
00040 local_custody_ = false;
00041 singleton_dest_ = true;
00042 priority_ = COS_NORMAL;
00043 receive_rcpt_ = false;
00044 custody_rcpt_ = false;
00045 forward_rcpt_ = false;
00046 delivery_rcpt_ = false;
00047 deletion_rcpt_ = false;
00048 app_acked_rcpt_ = false;
00049 orig_length_ = 0;
00050 frag_offset_ = 0;
00051 expiration_ = 0;
00052 owner_ = "";
00053
00054
00055
00056
00057 creation_ts_.seconds_ = BundleTimestamp::get_current_time();
00058 creation_ts_.seqno_ = bundleid_;
00059
00060 log_debug_p("/dtn/bundle", "Bundle::init bundle id %d", id);
00061 }
00062
00063
00064 Bundle::Bundle(BundlePayload::location_t location)
00065 : payload_(&lock_), fwdlog_(&lock_)
00066 {
00067 u_int32_t id = GlobalStore::instance()->next_bundleid();
00068 init(id);
00069 payload_.init(id, location);
00070 refcount_ = 0;
00071 expiration_timer_ = NULL;
00072 freed_ = false;
00073 }
00074
00075
00076 Bundle::Bundle(const oasys::Builder&)
00077 : payload_(&lock_), fwdlog_(&lock_)
00078 {
00079
00080
00081
00082
00083 refcount_ = 0;
00084 bundleid_ = 0xffffffff;
00085 expiration_timer_ = NULL;
00086 freed_ = false;
00087 }
00088
00089
00090 Bundle::~Bundle()
00091 {
00092 log_debug_p("/dtn/bundle/free", "destroying bundle id %d", bundleid_);
00093
00094 ASSERT(mappings_.size() == 0);
00095 bundleid_ = 0xdeadf00d;
00096
00097 ASSERTF(expiration_timer_ == NULL,
00098 "bundle deleted with pending expiration timer");
00099
00100 }
00101
00102
00103 int
00104 Bundle::format(char* buf, size_t sz) const
00105 {
00106 return snprintf(buf, sz, "bundle id %d %s -> %s (%s%s%d bytes payload)",
00107 bundleid_, source_.c_str(), dest_.c_str(),
00108 is_admin_ ? "is_admin " : "",
00109 is_fragment_ ? "is_fragment " : "",
00110 (u_int32_t)payload_.length());
00111 }
00112
00113
00114 void
00115 Bundle::format_verbose(oasys::StringBuffer* buf)
00116 {
00117
00118 #define bool_to_str(x) ((x) ? "true" : "false")
00119
00120 buf->appendf("bundle id %d:\n", bundleid_);
00121 buf->appendf(" source: %s\n", source_.c_str());
00122 buf->appendf(" dest: %s\n", dest_.c_str());
00123 buf->appendf(" custodian: %s\n", custodian_.c_str());
00124 buf->appendf(" replyto: %s\n", replyto_.c_str());
00125 buf->appendf(" prevhop: %s\n", prevhop_.c_str());
00126 buf->appendf(" payload_length: %zu\n", payload_.length());
00127 buf->appendf(" priority: %d\n", priority_);
00128 buf->appendf(" custody_requested: %s\n", bool_to_str(custody_requested_));
00129 buf->appendf(" local_custody: %s\n", bool_to_str(local_custody_));
00130 buf->appendf(" singleton_dest: %s\n", bool_to_str(singleton_dest_));
00131 buf->appendf(" receive_rcpt: %s\n", bool_to_str(receive_rcpt_));
00132 buf->appendf(" custody_rcpt: %s\n", bool_to_str(custody_rcpt_));
00133 buf->appendf(" forward_rcpt: %s\n", bool_to_str(forward_rcpt_));
00134 buf->appendf(" delivery_rcpt: %s\n", bool_to_str(delivery_rcpt_));
00135 buf->appendf(" deletion_rcpt: %s\n", bool_to_str(deletion_rcpt_));
00136 buf->appendf(" app_acked_rcpt: %s\n", bool_to_str(app_acked_rcpt_));
00137 buf->appendf(" creation_ts: %u.%u\n",
00138 creation_ts_.seconds_, creation_ts_.seqno_);
00139 buf->appendf(" expiration: %d\n", expiration_);
00140 buf->appendf(" is_fragment: %s\n", bool_to_str(is_fragment_));
00141 buf->appendf(" is_admin: %s\n", bool_to_str(is_admin_));
00142 buf->appendf(" do_not_fragment: %s\n", bool_to_str(do_not_fragment_));
00143 buf->appendf(" orig_length: %d\n", orig_length_);
00144 buf->appendf(" frag_offset: %d\n", frag_offset_);
00145 buf->appendf("transmission_count: %zu\n", fwdlog_.get_transmission_count());
00146 }
00147
00148
00149 void
00150 Bundle::serialize(oasys::SerializeAction* a)
00151 {
00152 a->process("bundleid", &bundleid_);
00153 a->process("is_fragment", &is_fragment_);
00154 a->process("is_admin", &is_admin_);
00155 a->process("do_not_fragment", &do_not_fragment_);
00156 a->process("source", &source_);
00157 a->process("dest", &dest_);
00158 a->process("custodian", &custodian_);
00159 a->process("replyto", &replyto_);
00160 a->process("prevhop", &prevhop_);
00161 a->process("priority", &priority_);
00162 a->process("custody_requested", &custody_requested_);
00163 a->process("local_custody", &local_custody_);
00164 a->process("singleton_dest", &singleton_dest_);
00165 a->process("custody_rcpt", &custody_rcpt_);
00166 a->process("receive_rcpt", &receive_rcpt_);
00167 a->process("forward_rcpt", &forward_rcpt_);
00168 a->process("delivery_rcpt", &delivery_rcpt_);
00169 a->process("deletion_rcpt", &deletion_rcpt_);
00170 a->process("app_acked_rcpt", &app_acked_rcpt_);
00171 a->process("creation_ts_seconds", &creation_ts_.seconds_);
00172 a->process("creation_ts_seqno", &creation_ts_.seqno_);
00173 a->process("expiration", &expiration_);
00174 a->process("payload", &payload_);
00175 a->process("orig_length", &orig_length_);
00176 a->process("frag_offset", &frag_offset_);
00177 a->process("owner", &owner_);
00178 a->process("recv_blocks", &recv_blocks_);
00179 a->process("api_blocks", &api_blocks_);
00180
00181
00182
00183
00184
00185 if (a->action_code() == oasys::Serialize::UNMARSHAL) {
00186 in_datastore_ = true;
00187 payload_.init_from_store(bundleid_);
00188 }
00189 }
00190
00191
00192 void
00193 Bundle::copy_metadata(Bundle* new_bundle)
00194 {
00195 new_bundle->is_admin_ = is_admin_;
00196 new_bundle->is_fragment_ = is_fragment_;
00197 new_bundle->do_not_fragment_ = do_not_fragment_;
00198 new_bundle->source_ = source_;
00199 new_bundle->dest_ = dest_;
00200 new_bundle->custodian_ = custodian_;
00201 new_bundle->replyto_ = replyto_;
00202 new_bundle->priority_ = priority_;
00203 new_bundle->custody_requested_ = custody_requested_;
00204 new_bundle->local_custody_ = false;
00205 new_bundle->singleton_dest_ = singleton_dest_;
00206 new_bundle->custody_rcpt_ = custody_rcpt_;
00207 new_bundle->receive_rcpt_ = receive_rcpt_;
00208 new_bundle->forward_rcpt_ = forward_rcpt_;
00209 new_bundle->delivery_rcpt_ = delivery_rcpt_;
00210 new_bundle->deletion_rcpt_ = deletion_rcpt_;
00211 new_bundle->app_acked_rcpt_ = app_acked_rcpt_;
00212 new_bundle->creation_ts_ = creation_ts_;
00213 new_bundle->expiration_ = expiration_;
00214 }
00215
00216
00217 int
00218 Bundle::add_ref(const char* what1, const char* what2)
00219 {
00220 (void)what1;
00221 (void)what2;
00222
00223 oasys::ScopeLock l(&lock_, "Bundle::add_ref");
00224
00225 ASSERTF(freed_ == false, "Bundle::add_ref on bundle %d (%p)"
00226 "called when bundle is already being freed!", bundleid_, this);
00227
00228 ASSERT(refcount_ >= 0);
00229 int ret = ++refcount_;
00230 log_debug_p("/dtn/bundle/refs",
00231 "bundle id %d (%p): refcount %d -> %d (%zu mappings) add %s %s",
00232 bundleid_, this, refcount_ - 1, refcount_,
00233 mappings_.size(), what1, what2);
00234 return ret;
00235 }
00236
00237
00238 int
00239 Bundle::del_ref(const char* what1, const char* what2)
00240 {
00241 (void)what1;
00242 (void)what2;
00243
00244 oasys::ScopeLock l(&lock_, "Bundle::del_ref");
00245
00246 ASSERTF(freed_ == false, "Bundle::del_ref on bundle %d (%p)"
00247 "called when bundle is already being freed!", bundleid_, this);
00248
00249 int ret = --refcount_;
00250 log_debug_p("/dtn/bundle/refs",
00251 "bundle id %d (%p): refcount %d -> %d (%zu mappings) del %s %s",
00252 bundleid_, this, refcount_ + 1, refcount_,
00253 mappings_.size(), what1, what2);
00254
00255 if (refcount_ != 0) {
00256 return ret;
00257 }
00258
00259 freed_ = true;
00260
00261 log_debug_p("/dtn/bundle",
00262 "bundle id %d (%p): no more references, posting free event",
00263 bundleid_, this);
00264
00265 BundleDaemon::instance()->post(new BundleFreeEvent(this));
00266
00267 return 0;
00268 }
00269
00270
00271 Bundle::MappingsIterator
00272 Bundle::mappings_begin()
00273 {
00274 if (!lock_.is_locked_by_me())
00275 PANIC("Must lock Bundle before using mappings iterator");
00276
00277 return mappings_.begin();
00278 }
00279
00280
00281 Bundle::MappingsIterator
00282 Bundle::mappings_end()
00283 {
00284 if (!lock_.is_locked_by_me())
00285 PANIC("Must lock Bundle before using mappings iterator");
00286
00287 return mappings_.end();
00288 }
00289
00290
00291 bool
00292 Bundle::is_queued_on(BundleList* bundle_list)
00293 {
00294 oasys::ScopeLock l(&lock_, "Bundle::is_queued_on");
00295 return (mappings_.count(bundle_list) > 0);
00296 }
00297
00298
00299 bool
00300 Bundle::validate(oasys::StringBuffer* errbuf)
00301 {
00302 if (!source_.valid()) {
00303 errbuf->appendf("invalid source eid [%s]", source_.c_str());
00304 return false;
00305 }
00306
00307 if (!dest_.valid()) {
00308 errbuf->appendf("invalid dest eid [%s]", dest_.c_str());
00309 return false;
00310 }
00311
00312 if (!replyto_.valid()) {
00313 errbuf->appendf("invalid replyto eid [%s]", replyto_.c_str());
00314 return false;
00315 }
00316
00317 if (!custodian_.valid()) {
00318 errbuf->appendf("invalid custodian eid [%s]", custodian_.c_str());
00319 return false;
00320 }
00321
00322 return true;
00323
00324 }
00325
00326 }