00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include "Bundle.h"
00040 #include "BundleEvent.h"
00041 #include "BundleDaemon.h"
00042 #include "BundleList.h"
00043 #include "BundleRef.h"
00044 #include "FragmentManager.h"
00045
00046 namespace dtn {
00047
00048
00049 FragmentManager::FragmentManager()
00050 : Logger("FragmentManager", "/dtn/bundle/fragmentation")
00051 {
00052 }
00053
00054
00055 Bundle*
00056 FragmentManager::create_fragment(Bundle* bundle, size_t offset, size_t length)
00057 {
00058 Bundle* fragment = new Bundle();
00059
00060
00061 bundle->copy_metadata(fragment);
00062 fragment->is_fragment_ = true;
00063 fragment->do_not_fragment_ = false;
00064
00065
00066
00067 if (! bundle->is_fragment_) {
00068 fragment->orig_length_ = bundle->payload_.length();
00069 fragment->frag_offset_ = offset;
00070 } else {
00071 fragment->orig_length_ = bundle->orig_length_;
00072 fragment->frag_offset_ = bundle->frag_offset_ + offset;
00073 }
00074
00075
00076 if ((offset + length) > fragment->orig_length_) {
00077 PANIC("fragment length overrun: "
00078 "orig_length %u frag_offset %u requested offset %zu length %zu",
00079 fragment->orig_length_, fragment->frag_offset_,
00080 offset, length);
00081 }
00082
00083
00084
00085 fragment->payload_.set_length(length);
00086 fragment->payload_.write_data(&bundle->payload_, offset, length, 0);
00087 fragment->payload_.close_file();
00088
00089 return fragment;
00090 }
00091
00092
00093 void
00094 FragmentManager::convert_to_fragment(Bundle* bundle, size_t length)
00095 {
00096 size_t payload_len = bundle->payload_.length();
00097 ASSERT(payload_len > length);
00098
00099 if (! bundle->is_fragment_) {
00100 bundle->is_fragment_ = true;
00101 bundle->orig_length_ = payload_len;
00102 bundle->frag_offset_ = 0;
00103 } else {
00104
00105
00106 }
00107
00108
00109 bundle->payload_.truncate(length);
00110 }
00111
00112
00113 void
00114 FragmentManager::get_hash_key(const Bundle* bundle, std::string* key)
00115 {
00116 char buf[128];
00117 snprintf(buf, 128, "%u.%u",
00118 bundle->creation_ts_.seconds_,
00119 bundle->creation_ts_.seqno_);
00120
00121 key->append(buf);
00122 key->append(bundle->source_.c_str());
00123 key->append(bundle->dest_.c_str());
00124 }
00125
00126
00127 bool
00128 FragmentManager::check_completed(ReassemblyState* state)
00129 {
00130 Bundle* fragment;
00131 BundleList::iterator iter;
00132 oasys::ScopeLock l(state->fragments_.lock(),
00133 "FragmentManager::check_completed");
00134
00135 size_t done_up_to = 0;
00136 size_t f_len;
00137 size_t f_offset;
00138 size_t f_origlen;
00139
00140 size_t total_len = state->bundle_->payload_.length();
00141
00142 int fragi = 0;
00143 int fragn = state->fragments_.size();
00144 (void)fragn;
00145
00146 for (iter = state->fragments_.begin();
00147 iter != state->fragments_.end();
00148 ++iter, ++fragi)
00149 {
00150 fragment = *iter;
00151
00152 f_len = fragment->payload_.length();
00153 f_offset = fragment->frag_offset_;
00154 f_origlen = fragment->orig_length_;
00155
00156 ASSERT(fragment->is_fragment_);
00157
00158 if (f_origlen != total_len) {
00159 PANIC("check_completed: error fragment orig len %zu != total %zu",
00160 f_origlen, total_len);
00161
00162 }
00163
00164 if (done_up_to == f_offset) {
00165
00166
00167
00168
00169
00170 log_debug("check_completed fragment %d/%d: "
00171 "offset %zu len %zu total %zu done_up_to %zu: "
00172 "(perfect fit)",
00173 fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00174 done_up_to += f_len;
00175 }
00176
00177 else if (done_up_to < f_offset) {
00178
00179
00180
00181
00182 log_debug("check_completed fragment %d/%d: "
00183 "offset %zu len %zu total %zu done_up_to %zu: "
00184 "(found a hole)",
00185 fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00186 return false;
00187
00188 }
00189
00190 else if (done_up_to > (f_offset + f_len)) {
00191
00192
00193
00194
00195 log_debug("check_completed fragment %d/%d: "
00196 "offset %zu len %zu total %zu done_up_to %zu: "
00197 "(redundant fragment)",
00198 fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00199 continue;
00200 }
00201
00202 else if (done_up_to > f_offset) {
00203
00204
00205
00206
00207
00208 log_debug("check_completed fragment %d/%d: "
00209 "offset %zu len %zu total %zu done_up_to %zu: "
00210 "(overlapping fragment, reducing len to %zu)",
00211 fragi, fragn, f_offset, f_len, f_origlen, done_up_to,
00212 (f_len - (done_up_to - f_offset)));
00213
00214 f_len -= (done_up_to - f_offset);
00215 done_up_to += f_len;
00216 }
00217
00218 else {
00219
00220 NOTREACHED;
00221 }
00222 }
00223
00224 if (done_up_to == total_len) {
00225 log_debug("check_completed reassembly complete!");
00226 return true;
00227 } else {
00228 log_debug("check_completed reassembly not done (got %zu/%zu)",
00229 done_up_to, total_len);
00230 return false;
00231 }
00232 }
00233
00234
00235 int
00236 FragmentManager::proactively_fragment(Bundle* bundle, size_t max_length)
00237 {
00238 size_t payload_len = bundle->payload_.length();
00239
00240 if (max_length == 0 || max_length > payload_len) {
00241 return 0;
00242 }
00243
00244 log_info("proactively fragmenting "
00245 "%zu byte bundle into %zu %zu byte fragments",
00246 payload_len, (payload_len / max_length), max_length);
00247
00248 Bundle* fragment;
00249 size_t todo = payload_len;
00250 size_t offset = 0;
00251 size_t fraglen = max_length;
00252 size_t count = 0;
00253
00254 do {
00255 if ((offset + fraglen) > payload_len) {
00256 fraglen = payload_len - offset;
00257 }
00258 ASSERT(todo >= fraglen);
00259
00260 fragment = create_fragment(bundle, offset, fraglen);
00261 ASSERT(fragment);
00262
00263 BundleDaemon::post(
00264 new BundleReceivedEvent(fragment,
00265 EVENTSRC_FRAGMENTATION,
00266 fraglen));
00267 offset += fraglen;
00268 todo -= fraglen;
00269 ++count;
00270
00271 } while (todo > 0);
00272
00273 bundle->payload_.close_file();
00274 return count;
00275 }
00276
00277
00278 bool
00279 FragmentManager::try_to_reactively_fragment(Bundle* bundle, size_t bytes_sent)
00280 {
00281 size_t payload_len = bundle->payload_.length();
00282
00283 if ((bytes_sent == 0) || (bytes_sent == payload_len))
00284 {
00285 return false;
00286 }
00287
00288 size_t frag_off = bytes_sent;
00289 size_t frag_len = payload_len - bytes_sent;
00290
00291 log_debug("creating reactive fragment (offset %zu len %zu/%zu)",
00292 frag_off, frag_len, payload_len);
00293
00294 Bundle* tail = create_fragment(bundle, frag_off, frag_len);
00295 bundle->payload_.close_file();
00296
00297
00298 BundleDaemon::post_at_head(
00299 new BundleReceivedEvent(tail, EVENTSRC_FRAGMENTATION, frag_len));
00300
00301 return true;
00302 }
00303
00304
00305 void
00306 FragmentManager::process_for_reassembly(Bundle* fragment)
00307 {
00308 ReassemblyState* state;
00309 ReassemblyTable::iterator iter;
00310
00311 ASSERT(fragment->is_fragment_);
00312
00313
00314 std::string hash_key;
00315 get_hash_key(fragment, &hash_key);
00316 iter = reassembly_table_.find(hash_key);
00317
00318 log_debug("processing bundle fragment id=%u hash=%s %d",
00319 fragment->bundleid_, hash_key.c_str(), fragment->is_fragment_);
00320
00321 if (iter == reassembly_table_.end()) {
00322 log_debug("no reassembly state for key %s -- creating new state",
00323 hash_key.c_str());
00324 state = new ReassemblyState();
00325
00326
00327
00328
00329 state->bundle_ = new Bundle();
00330 fragment->copy_metadata(state->bundle_.object());
00331 state->bundle_->is_fragment_ = false;
00332
00333 state->bundle_->payload_.set_length(fragment->orig_length_,
00334 BundlePayload::DISK);
00335 reassembly_table_[hash_key] = state;
00336 } else {
00337 state = iter->second;
00338 log_debug("found reassembly state for key %s (%zu fragments)",
00339 hash_key.c_str(), state->fragments_.size());
00340
00341 state->bundle_->payload_.reopen_file();
00342 }
00343
00344
00345 state->fragments_.insert_sorted(fragment, BundleList::SORT_FRAG_OFFSET);
00346
00347
00348 size_t fraglen = fragment->payload_.length();
00349
00350 log_debug("write_data: length_=%zu src_offset=%u dst_offset=%u len %zu",
00351 state->bundle_->payload_.length(),
00352 0, fragment->frag_offset_, fraglen);
00353
00354 state->bundle_->payload_.write_data(&fragment->payload_, 0, fraglen,
00355 fragment->frag_offset_);
00356 state->bundle_->payload_.close_file();
00357
00358
00359 if (!check_completed(state)) {
00360 return;
00361 }
00362
00363 BundleDaemon::post_at_head
00364 (new ReassemblyCompletedEvent(state->bundle_.object(),
00365 &state->fragments_));
00366 ASSERT(state->fragments_.size() == 0);
00367 reassembly_table_.erase(hash_key);
00368 delete state;
00369 }
00370
00371
00372
00373 }