FragmentManager.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2004 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include "Bundle.h"
00040 #include "BundleEvent.h"
00041 #include "BundleDaemon.h"
00042 #include "BundleList.h"
00043 #include "BundleRef.h"
00044 #include "FragmentManager.h"
00045 
00046 namespace dtn {
00047 
00048 //----------------------------------------------------------------------
00049 FragmentManager::FragmentManager()
00050     : Logger("FragmentManager", "/dtn/bundle/fragmentation")
00051 {
00052 }
00053 
00054 //----------------------------------------------------------------------
00055 Bundle* 
00056 FragmentManager::create_fragment(Bundle* bundle, size_t offset, size_t length)
00057 {
00058     Bundle* fragment = new Bundle();
00059 
00060     // copy the metadata into the new  fragment (which can be further fragmented)
00061     bundle->copy_metadata(fragment);
00062     fragment->is_fragment_     = true;
00063     fragment->do_not_fragment_ = false;
00064     
00065     // initialize the fragment's orig_length and figure out the offset
00066     // into the payload
00067     if (! bundle->is_fragment_) {
00068         fragment->orig_length_ = bundle->payload_.length();
00069         fragment->frag_offset_ = offset;
00070     } else {
00071         fragment->orig_length_ = bundle->orig_length_;
00072         fragment->frag_offset_ = bundle->frag_offset_ + offset;
00073     }
00074 
00075     // check for overallocated length
00076     if ((offset + length) > fragment->orig_length_) {
00077         PANIC("fragment length overrun: "
00078               "orig_length %u frag_offset %u requested offset %zu length %zu",
00079               fragment->orig_length_, fragment->frag_offset_,
00080               offset, length);
00081     }
00082 
00083 
00084     // initialize payload
00085     fragment->payload_.set_length(length);
00086     fragment->payload_.write_data(&bundle->payload_, offset, length, 0);
00087     fragment->payload_.close_file();
00088 
00089     return fragment;
00090 }
00091 
00092 //----------------------------------------------------------------------
00093 void
00094 FragmentManager::convert_to_fragment(Bundle* bundle, size_t length)
00095 {
00096     size_t payload_len = bundle->payload_.length();
00097     ASSERT(payload_len > length);
00098 
00099     if (! bundle->is_fragment_) {
00100         bundle->is_fragment_ = true;
00101         bundle->orig_length_ = payload_len;
00102         bundle->frag_offset_ = 0;
00103     } else {
00104         // if it was already a fragment, the fragment headers are
00105         // already correct
00106     }
00107 
00108     // truncate the payload
00109     bundle->payload_.truncate(length);
00110 }
00111 
00112 //----------------------------------------------------------------------
00113 void
00114 FragmentManager::get_hash_key(const Bundle* bundle, std::string* key)
00115 {
00116     char buf[128];
00117     snprintf(buf, 128, "%u.%u",
00118              bundle->creation_ts_.seconds_,
00119              bundle->creation_ts_.seqno_);
00120     
00121     key->append(buf);
00122     key->append(bundle->source_.c_str());
00123     key->append(bundle->dest_.c_str());
00124 }
00125 
00126 //----------------------------------------------------------------------
00127 bool
00128 FragmentManager::check_completed(ReassemblyState* state)
00129 {
00130     Bundle* fragment;
00131     BundleList::iterator iter;
00132     oasys::ScopeLock l(state->fragments_.lock(),
00133                        "FragmentManager::check_completed");
00134     
00135     size_t done_up_to = 0;  // running total of completed reassembly
00136     size_t f_len;
00137     size_t f_offset;
00138     size_t f_origlen;
00139 
00140     size_t total_len = state->bundle_->payload_.length();
00141     
00142     int fragi = 0;
00143     int fragn = state->fragments_.size();
00144     (void)fragn; // in case NDEBUG is defined
00145 
00146     for (iter = state->fragments_.begin();
00147          iter != state->fragments_.end();
00148          ++iter, ++fragi)
00149     {
00150         fragment = *iter;
00151 
00152         f_len = fragment->payload_.length();
00153         f_offset = fragment->frag_offset_;
00154         f_origlen = fragment->orig_length_;
00155         
00156         ASSERT(fragment->is_fragment_);
00157         
00158         if (f_origlen != total_len) {
00159             PANIC("check_completed: error fragment orig len %zu != total %zu",
00160                   f_origlen, total_len);
00161             // XXX/demmer deal with this
00162         }
00163 
00164         if (done_up_to == f_offset) {
00165             /*
00166              * fragment is adjacent to the bytes so far
00167              * bbbbbbbbbb
00168              *           fff
00169              */
00170             log_debug("check_completed fragment %d/%d: "
00171                       "offset %zu len %zu total %zu done_up_to %zu: "
00172                       "(perfect fit)",
00173                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00174             done_up_to += f_len;
00175         }
00176 
00177         else if (done_up_to < f_offset) {
00178             /*
00179              * there's a gap
00180              * bbbbbbb ffff
00181              */
00182             log_debug("check_completed fragment %d/%d: "
00183                       "offset %zu len %zu total %zu done_up_to %zu: "
00184                       "(found a hole)",
00185                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00186             return false;
00187 
00188         }
00189 
00190         else if (done_up_to > (f_offset + f_len)) {
00191             /* fragment is completely redundant, skip
00192              * bbbbbbbbbb
00193              *      fffff
00194              */
00195             log_debug("check_completed fragment %d/%d: "
00196                       "offset %zu len %zu total %zu done_up_to %zu: "
00197                       "(redundant fragment)",
00198                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to);
00199             continue;
00200         }
00201         
00202         else if (done_up_to > f_offset) {
00203             /*
00204              * there's some overlap, so reduce f_len accordingly
00205              * bbbbbbbbbb
00206              *      fffffff
00207              */
00208             log_debug("check_completed fragment %d/%d: "
00209                       "offset %zu len %zu total %zu done_up_to %zu: "
00210                       "(overlapping fragment, reducing len to %zu)",
00211                       fragi, fragn, f_offset, f_len, f_origlen, done_up_to,
00212                       (f_len - (done_up_to - f_offset)));
00213             
00214             f_len -= (done_up_to - f_offset);
00215             done_up_to += f_len;
00216         }
00217 
00218         else {
00219             // all cases should be covered above
00220             NOTREACHED;
00221         }
00222     }
00223 
00224     if (done_up_to == total_len) {
00225         log_debug("check_completed reassembly complete!");
00226         return true;
00227     } else {
00228         log_debug("check_completed reassembly not done (got %zu/%zu)",
00229                   done_up_to, total_len);
00230         return false;
00231     }
00232 }
00233 
00234 //----------------------------------------------------------------------
00235 int
00236 FragmentManager::proactively_fragment(Bundle* bundle, size_t max_length)
00237 {
00238     size_t payload_len = bundle->payload_.length();
00239     
00240     if (max_length == 0 || max_length > payload_len) {
00241         return 0;
00242     }
00243 
00244     log_info("proactively fragmenting "
00245          "%zu byte bundle into %zu %zu byte fragments",
00246          payload_len, (payload_len / max_length), max_length);
00247 
00248     Bundle* fragment;
00249     size_t todo = payload_len;
00250     size_t offset = 0;
00251     size_t fraglen = max_length;
00252     size_t count = 0;
00253     
00254     do {
00255         if ((offset + fraglen) > payload_len) {
00256             fraglen = payload_len - offset; // tail
00257         }
00258         ASSERT(todo >= fraglen);
00259         
00260         fragment = create_fragment(bundle, offset, fraglen);
00261         ASSERT(fragment);
00262         
00263         BundleDaemon::post(
00264             new BundleReceivedEvent(fragment,
00265                                     EVENTSRC_FRAGMENTATION,
00266                                     fraglen));
00267         offset += fraglen;
00268         todo -= fraglen;
00269         ++count;
00270         
00271     } while (todo > 0);
00272 
00273     bundle->payload_.close_file();
00274     return count;
00275 }
00276 
00277 //----------------------------------------------------------------------
00278 bool
00279 FragmentManager::try_to_reactively_fragment(Bundle* bundle, size_t bytes_sent)
00280 {
00281     size_t payload_len = bundle->payload_.length();
00282     
00283     if ((bytes_sent == 0) || (bytes_sent == payload_len))
00284     {
00285         return false; // nothing to do
00286     }
00287     
00288     size_t frag_off = bytes_sent;
00289     size_t frag_len = payload_len - bytes_sent;
00290 
00291     log_debug("creating reactive fragment (offset %zu len %zu/%zu)",
00292               frag_off, frag_len, payload_len);
00293     
00294     Bundle* tail = create_fragment(bundle, frag_off, frag_len);
00295     bundle->payload_.close_file();
00296 
00297     // treat the new fragment as if it just arrived
00298     BundleDaemon::post_at_head(
00299         new BundleReceivedEvent(tail, EVENTSRC_FRAGMENTATION, frag_len));
00300 
00301     return true;
00302 }
00303 
00304 //----------------------------------------------------------------------
00305 void
00306 FragmentManager::process_for_reassembly(Bundle* fragment)
00307 {
00308     ReassemblyState* state;
00309     ReassemblyTable::iterator iter;
00310 
00311     ASSERT(fragment->is_fragment_);
00312 
00313     // cons up the key to do the table lookup and look for reassembly state
00314     std::string hash_key;
00315     get_hash_key(fragment, &hash_key);
00316     iter = reassembly_table_.find(hash_key);
00317 
00318     log_debug("processing bundle fragment id=%u hash=%s %d",
00319               fragment->bundleid_, hash_key.c_str(), fragment->is_fragment_);
00320 
00321     if (iter == reassembly_table_.end()) {
00322         log_debug("no reassembly state for key %s -- creating new state",
00323                   hash_key.c_str());
00324         state = new ReassemblyState();
00325 
00326         // copy the metadata from the first fragment to arrive, but
00327         // make sure we mark the bundle that it's not a fragment (or
00328         // at least won't be for long)
00329         state->bundle_ = new Bundle();
00330         fragment->copy_metadata(state->bundle_.object());
00331         state->bundle_->is_fragment_ = false;
00332         
00333         state->bundle_->payload_.set_length(fragment->orig_length_,
00334                                             BundlePayload::DISK);
00335         reassembly_table_[hash_key] = state;
00336     } else {
00337         state = iter->second;
00338         log_debug("found reassembly state for key %s (%zu fragments)",
00339                   hash_key.c_str(), state->fragments_.size());
00340 
00341         state->bundle_->payload_.reopen_file();
00342     }
00343 
00344     // stick the fragment on the reassembly list
00345     state->fragments_.insert_sorted(fragment, BundleList::SORT_FRAG_OFFSET);
00346     
00347     // store the fragment data in the partially reassembled bundle file
00348     size_t fraglen = fragment->payload_.length();
00349     
00350     log_debug("write_data: length_=%zu src_offset=%u dst_offset=%u len %zu",
00351               state->bundle_->payload_.length(), 
00352               0, fragment->frag_offset_, fraglen);
00353 
00354     state->bundle_->payload_.write_data(&fragment->payload_, 0, fraglen,
00355                                         fragment->frag_offset_);
00356     state->bundle_->payload_.close_file();
00357     
00358     // check see if we're done
00359     if (!check_completed(state)) {
00360         return;
00361     }
00362 
00363     BundleDaemon::post_at_head
00364         (new ReassemblyCompletedEvent(state->bundle_.object(),
00365                                       &state->fragments_));
00366     ASSERT(state->fragments_.size() == 0); // moved into the event
00367     reassembly_table_.erase(hash_key);
00368     delete state;
00369 }
00370 
00371 
00372 
00373 } // namespace dtn

Generated on Fri Dec 22 14:47:59 2006 for DTN Reference Implementation by  doxygen 1.5.1