00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include "Bundle.h"
00019 #include "BundleEvent.h"
00020 #include "BundleDaemon.h"
00021 #include "BundleList.h"
00022 #include "BundleRef.h"
00023 #include "FragmentManager.h"
00024
00025 namespace dtn {
00026
00027
00028 FragmentManager::FragmentManager()
00029 : Logger("FragmentManager", "/dtn/bundle/fragmentation")
00030 {
00031 }
00032
00033
00034 Bundle*
00035 FragmentManager::create_fragment(Bundle* bundle, size_t offset, size_t length)
00036 {
00037 Bundle* fragment = new Bundle();
00038
00039
00040 bundle->copy_metadata(fragment);
00041 fragment->is_fragment_ = true;
00042 fragment->do_not_fragment_ = false;
00043
00044
00045
00046 if (! bundle->is_fragment_) {
00047 fragment->orig_length_ = bundle->payload_.length();
00048 fragment->frag_offset_ = offset;
00049 } else {
00050 fragment->orig_length_ = bundle->orig_length_;
00051 fragment->frag_offset_ = bundle->frag_offset_ + offset;
00052 }
00053
00054
00055 if ((offset + length) > fragment->orig_length_) {
00056 PANIC("fragment length overrun: "
00057 "orig_length %u frag_offset %u requested offset %zu length %zu",
00058 fragment->orig_length_, fragment->frag_offset_,
00059 offset, length);
00060 }
00061
00062
00063
00064 fragment->payload_.set_length(length);
00065 fragment->payload_.write_data(&bundle->payload_, offset, length, 0);
00066
00067 return fragment;
00068 }
00069
00070
00071 bool
00072 FragmentManager::try_to_convert_to_fragment(Bundle* bundle,
00073 size_t payload_offset,
00074 size_t bytes_rcvd)
00075 {
00076 if (bytes_rcvd <= payload_offset) {
00077 return false;
00078 }
00079
00080 size_t payload_len = bundle->payload_.length();
00081 size_t payload_rcvd = std::min(payload_len, bytes_rcvd - payload_offset);
00082
00083 if (payload_rcvd >= payload_len) {
00084 return false;
00085 }
00086
00087 log_debug("partial bundle *%p, making reactive fragment of %zu bytes",
00088 bundle, bytes_rcvd);
00089
00090 if (! bundle->is_fragment_) {
00091 bundle->is_fragment_ = true;
00092 bundle->orig_length_ = payload_len;
00093 bundle->frag_offset_ = 0;
00094 } else {
00095
00096
00097 }
00098
00099 bundle->payload_.truncate(payload_rcvd);
00100
00101 return true;
00102 }
00103
00104
00105 void
00106 FragmentManager::get_hash_key(const Bundle* bundle, std::string* key)
00107 {
00108 char buf[128];
00109 snprintf(buf, 128, "%u.%u",
00110 bundle->creation_ts_.seconds_,
00111 bundle->creation_ts_.seqno_);
00112
00113 key->append(buf);
00114 key->append(bundle->source_.c_str());
00115 key->append(bundle->dest_.c_str());
00116 }
00117
00118
00119 bool
00120 FragmentManager::check_completed(ReassemblyState* state)
00121 {
00122 Bundle* fragment;
00123 BundleList::iterator iter;
00124 oasys::ScopeLock l(state->fragments_.lock(),
00125 "FragmentManager::check_completed");
00126
00127 size_t done_up_to = 0;
00128 size_t f_len;
00129 size_t f_offset;
00130 size_t f_origlen;
00131
00132 size_t total_len = state->bundle_->payload_.length();
00133
00134 int fragi = 0;
00135 int fragn = state->fragments_.size();
00136 (void)fragn;
00137
00138 for (iter = state->fragments_.begin();
00139 iter != state->fragments_.end();
00140 ++iter, ++fragi)
00141 {
00142 fragment = *iter;
00143
00144 f_len = fragment->payload_.length();
00145 f_offset = fragment->frag_offset_;
00146 f_origlen = fragment->orig_length_;
00147
00148 ASSERT(fragment->is_fragment_);
00149
00150 if (f_origlen != total_len) {
00151 PANIC("check_completed: error fragment orig len %zu != total %zu",
00152 f_origlen, total_len);
00153
00154 }
00155
00156 if (done_up_to == f_offset) {
00157
00158
00159
00160
00161
00162 log_debug("check_completed fragment %d/%d: "
00163 "offset %zu len %zu total %zu done_up_to %zu: "
00164 "(perfect fit)",
00165 fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00166 done_up_to += f_len;
00167 }
00168
00169 else if (done_up_to < f_offset) {
00170
00171
00172
00173
00174 log_debug("check_completed fragment %d/%d: "
00175 "offset %zu len %zu total %zu done_up_to %zu: "
00176 "(found a hole)",
00177 fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00178 return false;
00179
00180 }
00181
00182 else if (done_up_to > (f_offset + f_len)) {
00183
00184
00185
00186
00187 log_debug("check_completed fragment %d/%d: "
00188 "offset %zu len %zu total %zu done_up_to %zu: "
00189 "(redundant fragment)",
00190 fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00191 continue;
00192 }
00193
00194 else if (done_up_to > f_offset) {
00195
00196
00197
00198
00199
00200 log_debug("check_completed fragment %d/%d: "
00201 "offset %zu len %zu total %zu done_up_to %zu: "
00202 "(overlapping fragment, reducing len to %zu)",
00203 fragi, fragn, f_offset, f_len, f_origlen, done_up_to,
00204 (f_len - (done_up_to - f_offset)));
00205
00206 f_len -= (done_up_to - f_offset);
00207 done_up_to += f_len;
00208 }
00209
00210 else {
00211
00212 NOTREACHED;
00213 }
00214 }
00215
00216 if (done_up_to == total_len) {
00217 log_debug("check_completed reassembly complete!");
00218 return true;
00219 } else {
00220 log_debug("check_completed reassembly not done (got %zu/%zu)",
00221 done_up_to, total_len);
00222 return false;
00223 }
00224 }
00225
00226
00227 int
00228 FragmentManager::proactively_fragment(Bundle* bundle, size_t max_length)
00229 {
00230 size_t payload_len = bundle->payload_.length();
00231
00232 if (max_length == 0 || max_length > payload_len) {
00233 return 0;
00234 }
00235
00236 log_info("proactively fragmenting "
00237 "%zu byte bundle into %zu %zu byte fragments",
00238 payload_len, (payload_len / max_length), max_length);
00239
00240 Bundle* fragment;
00241 size_t todo = payload_len;
00242 size_t offset = 0;
00243 size_t fraglen = max_length;
00244 size_t count = 0;
00245
00246 do {
00247 if ((offset + fraglen) > payload_len) {
00248 fraglen = payload_len - offset;
00249 }
00250 ASSERT(todo >= fraglen);
00251
00252 fragment = create_fragment(bundle, offset, fraglen);
00253 ASSERT(fragment);
00254
00255 BundleDaemon::post(
00256 new BundleReceivedEvent(fragment, EVENTSRC_FRAGMENTATION));
00257 offset += fraglen;
00258 todo -= fraglen;
00259 ++count;
00260
00261 } while (todo > 0);
00262
00263 return count;
00264 }
00265
00266
00267 bool
00268 FragmentManager::try_to_reactively_fragment(Bundle* bundle,
00269 size_t payload_offset,
00270 size_t bytes_sent)
00271 {
00272 if (bytes_sent <= payload_offset) {
00273 return false;
00274 }
00275
00276 size_t payload_len = bundle->payload_.length();
00277 size_t payload_sent = std::min(payload_len, bytes_sent - payload_offset);
00278
00279 if (payload_sent >= payload_len) {
00280 return false;
00281 }
00282
00283 size_t frag_off = payload_sent;
00284 size_t frag_len = payload_len - payload_sent;
00285
00286 log_debug("creating reactive fragment (offset %zu len %zu/%zu)",
00287 frag_off, frag_len, payload_len);
00288
00289 Bundle* tail = create_fragment(bundle, frag_off, frag_len);
00290
00291
00292 BundleDaemon::post_at_head(
00293 new BundleReceivedEvent(tail, EVENTSRC_FRAGMENTATION));
00294
00295 return true;
00296 }
00297
00298
00299 void
00300 FragmentManager::process_for_reassembly(Bundle* fragment)
00301 {
00302 ReassemblyState* state;
00303 ReassemblyTable::iterator iter;
00304
00305 ASSERT(fragment->is_fragment_);
00306
00307
00308 std::string hash_key;
00309 get_hash_key(fragment, &hash_key);
00310 iter = reassembly_table_.find(hash_key);
00311
00312 log_debug("processing bundle fragment id=%u hash=%s %d",
00313 fragment->bundleid_, hash_key.c_str(), fragment->is_fragment_);
00314
00315 if (iter == reassembly_table_.end()) {
00316 log_debug("no reassembly state for key %s -- creating new state",
00317 hash_key.c_str());
00318 state = new ReassemblyState();
00319
00320
00321
00322
00323 state->bundle_ = new Bundle();
00324 fragment->copy_metadata(state->bundle_.object());
00325 state->bundle_->is_fragment_ = false;
00326
00327 state->bundle_->payload_.set_length(fragment->orig_length_,
00328 BundlePayload::DISK);
00329 reassembly_table_[hash_key] = state;
00330 } else {
00331 state = iter->second;
00332 log_debug("found reassembly state for key %s (%zu fragments)",
00333 hash_key.c_str(), state->fragments_.size());
00334 }
00335
00336
00337 state->fragments_.insert_sorted(fragment, BundleList::SORT_FRAG_OFFSET);
00338
00339
00340 size_t fraglen = fragment->payload_.length();
00341
00342 log_debug("write_data: length_=%zu src_offset=%u dst_offset=%u len %zu",
00343 state->bundle_->payload_.length(),
00344 0, fragment->frag_offset_, fraglen);
00345
00346 state->bundle_->payload_.write_data(&fragment->payload_, 0, fraglen,
00347 fragment->frag_offset_);
00348
00349
00350 if (!check_completed(state)) {
00351 return;
00352 }
00353
00354 BundleDaemon::post_at_head
00355 (new ReassemblyCompletedEvent(state->bundle_.object(),
00356 &state->fragments_));
00357 ASSERT(state->fragments_.size() == 0);
00358 reassembly_table_.erase(hash_key);
00359 delete state;
00360 }
00361
00362
00363 void
00364 FragmentManager::delete_fragment(Bundle* fragment)
00365 {
00366 ReassemblyState* state;
00367 ReassemblyTable::iterator iter;
00368
00369 ASSERT(fragment->is_fragment_);
00370
00371
00372 std::string hash_key;
00373 get_hash_key(fragment, &hash_key);
00374 iter = reassembly_table_.find(hash_key);
00375
00376
00377 if (iter == reassembly_table_.end()) {
00378 return;
00379 }
00380
00381 state = iter->second;
00382
00383
00384 bool erased = state->fragments_.erase(fragment);
00385
00386
00387 if (!erased) {
00388 return;
00389 }
00390
00391
00392
00393 u_char buf[fragment->payload_.length()];
00394 memset(buf, '\0', fragment->payload_.length());
00395
00396
00397 state->bundle_->payload_.write_data(buf, fragment->frag_offset_,
00398 fragment->payload_.length());
00399
00400
00401 if (state->fragments_.size() == 0) {
00402 reassembly_table_.erase(hash_key);
00403 delete state;
00404 }
00405 }
00406
00407 }