FragmentManager.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include "Bundle.h"
00019 #include "BundleEvent.h"
00020 #include "BundleDaemon.h"
00021 #include "BundleList.h"
00022 #include "BundleRef.h"
00023 #include "FragmentManager.h"
00024 
00025 namespace dtn {
00026 
00027 //----------------------------------------------------------------------
00028 FragmentManager::FragmentManager()
00029     : Logger("FragmentManager", "/dtn/bundle/fragmentation")
00030 {
00031 }
00032 
00033 //----------------------------------------------------------------------
00034 Bundle* 
00035 FragmentManager::create_fragment(Bundle* bundle, size_t offset, size_t length)
00036 {
00037     Bundle* fragment = new Bundle();
00038 
00039     // copy the metadata into the new  fragment (which can be further fragmented)
00040     bundle->copy_metadata(fragment);
00041     fragment->is_fragment_     = true;
00042     fragment->do_not_fragment_ = false;
00043     
00044     // initialize the fragment's orig_length and figure out the offset
00045     // into the payload
00046     if (! bundle->is_fragment_) {
00047         fragment->orig_length_ = bundle->payload_.length();
00048         fragment->frag_offset_ = offset;
00049     } else {
00050         fragment->orig_length_ = bundle->orig_length_;
00051         fragment->frag_offset_ = bundle->frag_offset_ + offset;
00052     }
00053 
00054     // check for overallocated length
00055     if ((offset + length) > fragment->orig_length_) {
00056         PANIC("fragment length overrun: "
00057               "orig_length %u frag_offset %u requested offset %zu length %zu",
00058               fragment->orig_length_, fragment->frag_offset_,
00059               offset, length);
00060     }
00061 
00062 
00063     // initialize payload
00064     fragment->payload_.set_length(length);
00065     fragment->payload_.write_data(&bundle->payload_, offset, length, 0);
00066 
00067     return fragment;
00068 }
00069 
00070 //----------------------------------------------------------------------
00071 bool
00072 FragmentManager::try_to_convert_to_fragment(Bundle* bundle,
00073                                             size_t  payload_offset,
00074                                             size_t  bytes_rcvd)
00075 {
00076     if (bytes_rcvd <= payload_offset) {
00077         return false; // can't do anything
00078     }
00079     
00080     size_t payload_len  = bundle->payload_.length();
00081     size_t payload_rcvd = std::min(payload_len, bytes_rcvd - payload_offset);
00082 
00083     if (payload_rcvd >= payload_len) {
00084         return false; // nothing to do
00085     }
00086     
00087     log_debug("partial bundle *%p, making reactive fragment of %zu bytes",
00088               bundle, bytes_rcvd);
00089         
00090     if (! bundle->is_fragment_) {
00091         bundle->is_fragment_ = true;
00092         bundle->orig_length_ = payload_len;
00093         bundle->frag_offset_ = 0;
00094     } else {
00095         // if it was already a fragment, the fragment headers are
00096         // already correct
00097     }
00098     
00099     bundle->payload_.truncate(payload_rcvd);
00100     
00101     return true;
00102 }
00103 
00104 //----------------------------------------------------------------------
00105 void
00106 FragmentManager::get_hash_key(const Bundle* bundle, std::string* key)
00107 {
00108     char buf[128];
00109     snprintf(buf, 128, "%u.%u",
00110              bundle->creation_ts_.seconds_,
00111              bundle->creation_ts_.seqno_);
00112     
00113     key->append(buf);
00114     key->append(bundle->source_.c_str());
00115     key->append(bundle->dest_.c_str());
00116 }
00117 
00118 //----------------------------------------------------------------------
00119 bool
00120 FragmentManager::check_completed(ReassemblyState* state)
00121 {
00122     Bundle* fragment;
00123     BundleList::iterator iter;
00124     oasys::ScopeLock l(state->fragments_.lock(),
00125                        "FragmentManager::check_completed");
00126     
00127     size_t done_up_to = 0;  // running total of completed reassembly
00128     size_t f_len;
00129     size_t f_offset;
00130     size_t f_origlen;
00131 
00132     size_t total_len = state->bundle_->payload_.length();
00133     
00134     int fragi = 0;
00135     int fragn = state->fragments_.size();
00136     (void)fragn; // in case NDEBUG is defined
00137 
00138     for (iter = state->fragments_.begin();
00139          iter != state->fragments_.end();
00140          ++iter, ++fragi)
00141     {
00142         fragment = *iter;
00143 
00144         f_len = fragment->payload_.length();
00145         f_offset = fragment->frag_offset_;
00146         f_origlen = fragment->orig_length_;
00147         
00148         ASSERT(fragment->is_fragment_);
00149         
00150         if (f_origlen != total_len) {
00151             PANIC("check_completed: error fragment orig len %zu != total %zu",
00152                   f_origlen, total_len);
00153             // XXX/demmer deal with this
00154         }
00155 
00156         if (done_up_to == f_offset) {
00157             /*
00158              * fragment is adjacent to the bytes so far
00159              * bbbbbbbbbb
00160              *           fff
00161              */
00162             log_debug("check_completed fragment %d/%d: "
00163                       "offset %zu len %zu total %zu done_up_to %zu: "
00164                       "(perfect fit)",
00165                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00166             done_up_to += f_len;
00167         }
00168 
00169         else if (done_up_to < f_offset) {
00170             /*
00171              * there's a gap
00172              * bbbbbbb ffff
00173              */
00174             log_debug("check_completed fragment %d/%d: "
00175                       "offset %zu len %zu total %zu done_up_to %zu: "
00176                       "(found a hole)",
00177                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00178             return false;
00179 
00180         }
00181 
00182         else if (done_up_to > (f_offset + f_len)) {
00183             /* fragment is completely redundant, skip
00184              * bbbbbbbbbb
00185              *      fffff
00186              */
00187             log_debug("check_completed fragment %d/%d: "
00188                       "offset %zu len %zu total %zu done_up_to %zu: "
00189                       "(redundant fragment)",
00190                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00191             continue;
00192         }
00193         
00194         else if (done_up_to > f_offset) {
00195             /*
00196              * there's some overlap, so reduce f_len accordingly
00197              * bbbbbbbbbb
00198              *      fffffff
00199              */
00200             log_debug("check_completed fragment %d/%d: "
00201                       "offset %zu len %zu total %zu done_up_to %zu: "
00202                       "(overlapping fragment, reducing len to %zu)",
00203                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to,
00204                       (f_len - (done_up_to - f_offset)));
00205             
00206             f_len -= (done_up_to - f_offset);
00207             done_up_to += f_len;
00208         }
00209 
00210         else {
00211             // all cases should be covered above
00212             NOTREACHED;
00213         }
00214     }
00215 
00216     if (done_up_to == total_len) {
00217         log_debug("check_completed reassembly complete!");
00218         return true;
00219     } else {
00220         log_debug("check_completed reassembly not done (got %zu/%zu)",
00221                   done_up_to, total_len);
00222         return false;
00223     }
00224 }
00225 
00226 //----------------------------------------------------------------------
00227 int
00228 FragmentManager::proactively_fragment(Bundle* bundle, size_t max_length)
00229 {
00230     size_t payload_len = bundle->payload_.length();
00231     
00232     if (max_length == 0 || max_length > payload_len) {
00233         return 0;
00234     }
00235 
00236     log_info("proactively fragmenting "
00237          "%zu byte bundle into %zu %zu byte fragments",
00238          payload_len, (payload_len / max_length), max_length);
00239 
00240     Bundle* fragment;
00241     size_t todo = payload_len;
00242     size_t offset = 0;
00243     size_t fraglen = max_length;
00244     size_t count = 0;
00245     
00246     do {
00247         if ((offset + fraglen) > payload_len) {
00248             fraglen = payload_len - offset; // tail
00249         }
00250         ASSERT(todo >= fraglen);
00251         
00252         fragment = create_fragment(bundle, offset, fraglen);
00253         ASSERT(fragment);
00254         
00255         BundleDaemon::post(
00256             new BundleReceivedEvent(fragment, EVENTSRC_FRAGMENTATION));
00257         offset += fraglen;
00258         todo -= fraglen;
00259         ++count;
00260         
00261     } while (todo > 0);
00262 
00263     return count;
00264 }
00265 
00266 //----------------------------------------------------------------------
00267 bool
00268 FragmentManager::try_to_reactively_fragment(Bundle* bundle,
00269                                             size_t  payload_offset,
00270                                             size_t  bytes_sent)
00271 {
00272     if (bytes_sent <= payload_offset) {
00273         return false; // can't do anything
00274     }
00275     
00276     size_t payload_len  = bundle->payload_.length();
00277     size_t payload_sent = std::min(payload_len, bytes_sent - payload_offset);
00278     
00279     if (payload_sent >= payload_len) {
00280         return false; // nothing to do
00281     }
00282     
00283     size_t frag_off = payload_sent;
00284     size_t frag_len = payload_len - payload_sent;
00285 
00286     log_debug("creating reactive fragment (offset %zu len %zu/%zu)",
00287               frag_off, frag_len, payload_len);
00288     
00289     Bundle* tail = create_fragment(bundle, frag_off, frag_len);
00290 
00291     // treat the new fragment as if it just arrived
00292     BundleDaemon::post_at_head(
00293         new BundleReceivedEvent(tail, EVENTSRC_FRAGMENTATION));
00294 
00295     return true;
00296 }
00297 
00298 //----------------------------------------------------------------------
00299 void
00300 FragmentManager::process_for_reassembly(Bundle* fragment)
00301 {
00302     ReassemblyState* state;
00303     ReassemblyTable::iterator iter;
00304 
00305     ASSERT(fragment->is_fragment_);
00306 
00307     // cons up the key to do the table lookup and look for reassembly state
00308     std::string hash_key;
00309     get_hash_key(fragment, &hash_key);
00310     iter = reassembly_table_.find(hash_key);
00311 
00312     log_debug("processing bundle fragment id=%u hash=%s %d",
00313               fragment->bundleid_, hash_key.c_str(), fragment->is_fragment_);
00314 
00315     if (iter == reassembly_table_.end()) {
00316         log_debug("no reassembly state for key %s -- creating new state",
00317                   hash_key.c_str());
00318         state = new ReassemblyState();
00319 
00320         // copy the metadata from the first fragment to arrive, but
00321         // make sure we mark the bundle that it's not a fragment (or
00322         // at least won't be for long)
00323         state->bundle_ = new Bundle();
00324         fragment->copy_metadata(state->bundle_.object());
00325         state->bundle_->is_fragment_ = false;
00326         
00327         state->bundle_->payload_.set_length(fragment->orig_length_,
00328                                             BundlePayload::DISK);
00329         reassembly_table_[hash_key] = state;
00330     } else {
00331         state = iter->second;
00332         log_debug("found reassembly state for key %s (%zu fragments)",
00333                   hash_key.c_str(), state->fragments_.size());
00334     }
00335 
00336     // stick the fragment on the reassembly list
00337     state->fragments_.insert_sorted(fragment, BundleList::SORT_FRAG_OFFSET);
00338     
00339     // store the fragment data in the partially reassembled bundle file
00340     size_t fraglen = fragment->payload_.length();
00341     
00342     log_debug("write_data: length_=%zu src_offset=%u dst_offset=%u len %zu",
00343               state->bundle_->payload_.length(), 
00344               0, fragment->frag_offset_, fraglen);
00345 
00346     state->bundle_->payload_.write_data(&fragment->payload_, 0, fraglen,
00347                                         fragment->frag_offset_);
00348     
00349     // check see if we're done
00350     if (!check_completed(state)) {
00351         return;
00352     }
00353 
00354     BundleDaemon::post_at_head
00355         (new ReassemblyCompletedEvent(state->bundle_.object(),
00356                                       &state->fragments_));
00357     ASSERT(state->fragments_.size() == 0); // moved into the event
00358     reassembly_table_.erase(hash_key);
00359     delete state;
00360 }
00361 
00362 //----------------------------------------------------------------------
00363 void
00364 FragmentManager::delete_fragment(Bundle* fragment)
00365 {
00366     ReassemblyState* state;
00367     ReassemblyTable::iterator iter;
00368 
00369     ASSERT(fragment->is_fragment_);
00370 
00371     // cons up the key to do the table lookup and look for reassembly state
00372     std::string hash_key;
00373     get_hash_key(fragment, &hash_key);
00374     iter = reassembly_table_.find(hash_key);
00375 
00376     // no reassembly state, simply return
00377     if (iter == reassembly_table_.end()) {
00378         return;
00379     }
00380 
00381     state = iter->second;
00382 
00383     // remove the fragment from the reassembly list
00384     bool erased = state->fragments_.erase(fragment);
00385 
00386     // fragment was not in reassembly list, simply return
00387     if (!erased) {
00388         return;
00389     }
00390 
00391     // create a null buffer; the "null" character is used as padding in
00392     // the partially reassembled bundle file
00393     u_char buf[fragment->payload_.length()];
00394     memset(buf, '\0', fragment->payload_.length());
00395     
00396     // remove the fragment data from the partially reassembled bundle file
00397     state->bundle_->payload_.write_data(buf, fragment->frag_offset_,
00398                                         fragment->payload_.length());
00399 
00400     // delete reassembly state if no fragments now exist
00401     if (state->fragments_.size() == 0) {
00402         reassembly_table_.erase(hash_key);
00403         delete state;
00404     }
00405 }
00406 
00407 } // namespace dtn

Generated on Thu Jun 7 12:54:27 2007 for DTN Reference Implementation by  doxygen 1.5.1