BundleProtocol.cc

Go to the documentation of this file.
00001 /*
00002  *    Copyright 2004-2006 Intel Corporation
00003  * 
00004  *    Licensed under the Apache License, Version 2.0 (the "License");
00005  *    you may not use this file except in compliance with the License.
00006  *    You may obtain a copy of the License at
00007  * 
00008  *        http://www.apache.org/licenses/LICENSE-2.0
00009  * 
00010  *    Unless required by applicable law or agreed to in writing, software
00011  *    distributed under the License is distributed on an "AS IS" BASIS,
00012  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
00013  *    See the License for the specific language governing permissions and
00014  *    limitations under the License.
00015  */
00016 
00017 
00018 #include <sys/types.h>
00019 #include <netinet/in.h>
00020 
00021 #include <oasys/debug/DebugUtils.h>
00022 #include <oasys/util/StringUtils.h>
00023 
00024 #include "BundleProtocol.h"
00025 #include "BlockInfo.h"
00026 #include "BlockProcessor.h"
00027 #include "Bundle.h"
00028 #include "BundleTimestamp.h"
00029 #include "PayloadBlockProcessor.h"
00030 #include "PreviousHopBlockProcessor.h"
00031 #include "PrimaryBlockProcessor.h"
00032 #include "SDNV.h"
00033 #include "UnknownBlockProcessor.h"
00034 
00035 namespace dtn {
00036 
00037 static const char* LOG = "/dtn/bundle/protocol";
00038 
00039 BlockProcessor* BundleProtocol::processors_[256];
00040 
00041 //----------------------------------------------------------------------
00042 void
00043 BundleProtocol::register_processor(BlockProcessor* bp)
00044 {
00045     // can't override an existing processor
00046     ASSERT(processors_[bp->block_type()] == 0);
00047     processors_[bp->block_type()] = bp;
00048 }
00049 
00050 //----------------------------------------------------------------------
00051 BlockProcessor*
00052 BundleProtocol::find_processor(u_int8_t type)
00053 {
00054     BlockProcessor* ret = processors_[type];
00055     if (ret == 0) {
00056         ret = UnknownBlockProcessor::instance();
00057     }
00058     return ret;
00059 }
00060 
00061 //----------------------------------------------------------------------
00062 void
00063 BundleProtocol::init_default_processors()
00064 {
00065     // register default block processor handlers
00066     BundleProtocol::register_processor(new PrimaryBlockProcessor());
00067     BundleProtocol::register_processor(new PreviousHopBlockProcessor());
00068     BundleProtocol::register_processor(new PayloadBlockProcessor());
00069 }
00070 
00071 //----------------------------------------------------------------------
00072 BlockInfoVec*
00073 BundleProtocol::prepare_blocks(Bundle* bundle, Link* link)
00074 {
00075     // create a new block list for the outgoing link by first calling
00076     // prepare on all the BlockProcessor classes for the blocks that
00077     // arrived on the link
00078     BlockInfoVec* xmit_blocks = bundle->xmit_blocks_.create_blocks(link);
00079     BlockInfoVec* recv_blocks = &bundle->recv_blocks_;
00080     BlockInfoVec* api_blocks = &bundle->api_blocks_;
00081 
00082     // if there is a received block, the first one better be the primary
00083     if (recv_blocks->size() > 0) {
00084         ASSERT(recv_blocks->front().type() == PRIMARY_BLOCK);
00085     }
00086     
00087     for (BlockInfoVec::iterator iter = recv_blocks->begin();
00088          iter != recv_blocks->end();
00089          ++iter)
00090     {
00091         iter->owner()->prepare(bundle, link, xmit_blocks, &*iter);
00092     }
00093 
00094     // now we also make sure to prepare() on any registered processors
00095     // that don't already have a block in the output list. this
00096     // handles the case where we have a locally generated block with
00097     // nothing in the recv_blocks vector
00098     //
00099     // XXX/demmer this needs some options for the router to select
00100     // which block elements should be in the list, i.e. security, etc
00101     for (int i = 0; i < 256; ++i) {
00102         BlockProcessor* bp = find_processor(i);
00103         if (bp == UnknownBlockProcessor::instance()) {
00104             continue;
00105         }
00106 
00107         if (! xmit_blocks->has_block(i)) {
00108             bp->prepare(bundle, link, xmit_blocks, NULL);
00109         }
00110     }
00111 
00112     // locally generated bundles need to include blocks specified at the API
00113     for (BlockInfoVec::iterator iter = api_blocks->begin();
00114          iter != api_blocks->end();
00115          ++iter)
00116     {
00117         iter->owner()->prepare(bundle, link, xmit_blocks, &*iter);
00118     }
00119 
00120     return xmit_blocks;
00121 }
00122 
00123 //----------------------------------------------------------------------
00124 size_t
00125 BundleProtocol::generate_blocks(Bundle*       bundle,
00126                                 BlockInfoVec* blocks,
00127                                 Link*         link)
00128 {
00129     // now assert there's at least 2 blocks (primary + payload) and
00130     // that the primary is first
00131     ASSERT(blocks->size() >= 2);
00132     ASSERT(blocks->front().type() == PRIMARY_BLOCK);
00133 
00134     // now we make another pass through the list and call generate on
00135     // each block processor
00136     BlockInfoVec::iterator last_block = blocks->end() - 1;
00137     for (BlockInfoVec::iterator iter = blocks->begin();
00138          iter != blocks->end();
00139          ++iter)
00140     {
00141         bool last = (iter == last_block);
00142         iter->owner()->generate(bundle, link, &*iter, last);
00143 
00144         log_debug_p(LOG, "generated block (owner 0x%x type 0x%x) "
00145                     "data_offset %u data_length %u contents length %zu",
00146                     iter->owner()->block_type(), iter->type(),
00147                     iter->data_offset(), iter->data_length(),
00148                     iter->contents().len());
00149         
00150         if (last) {
00151             ASSERT((iter->flags() & BLOCK_FLAG_LAST_BLOCK) != 0);
00152         } else {
00153             ASSERT((iter->flags() & BLOCK_FLAG_LAST_BLOCK) == 0);
00154         }
00155     }
00156     
00157     // make a final pass through, calling finalize() and extracting
00158     // the block length
00159     size_t total_len = 0;
00160     for (BlockInfoVec::iterator iter = blocks->begin();
00161          iter != blocks->end();
00162          ++iter)
00163     {
00164         iter->owner()->finalize(bundle, link, &*iter);
00165         total_len += iter->full_length();
00166     }
00167     
00168     return total_len;
00169 }
00170 
00171 //----------------------------------------------------------------------
00172 size_t
00173 BundleProtocol::total_length(const BlockInfoVec* blocks)
00174 {
00175     size_t ret = 0;
00176     for (BlockInfoVec::const_iterator iter = blocks->begin();
00177          iter != blocks->end();
00178          ++iter)
00179     {
00180         ret += iter->full_length();
00181     }
00182 
00183     return ret;
00184 }
00185 
00186 //----------------------------------------------------------------------
00187 size_t
00188 BundleProtocol::payload_offset(const BlockInfoVec* blocks)
00189 {
00190     size_t ret = 0;
00191     for (BlockInfoVec::const_iterator iter = blocks->begin();
00192          iter != blocks->end();
00193          ++iter)
00194     {
00195         if (iter->type() == PAYLOAD_BLOCK) {
00196             ret += iter->data_offset();
00197             return ret;
00198         }
00199 
00200         ret += iter->full_length();
00201     }
00202 
00203     return ret;
00204 }
00205 
00206 //----------------------------------------------------------------------
00207 size_t
00208 BundleProtocol::produce(const Bundle* bundle, const BlockInfoVec* blocks,
00209                         u_char* data, size_t offset, size_t len, bool* last)
00210 {
00211     size_t origlen = len;
00212     *last = false;
00213 
00214     if (len == 0)
00215         return 0;
00216     
00217     // advance past any blocks that are skipped by the given offset
00218     BlockInfoVec::const_iterator iter = blocks->begin();
00219     while (offset >= iter->full_length()) {
00220         log_debug_p(LOG, "BundleProtocol::produce skipping block type 0x%x "
00221                     "since offset %zu >= block length %u",
00222                     iter->type(), offset, iter->full_length());
00223         
00224         offset -= iter->full_length();
00225         iter++;
00226         ASSERT(iter != blocks->end());
00227     }
00228     
00229     // the offset should be within the bounds of this block
00230     ASSERT(iter != blocks->end());
00231         
00232     // figure out the amount to generate from this block
00233     while (1) {
00234         size_t remainder = iter->full_length() - offset;
00235         size_t tocopy    = std::min(len, remainder);
00236         log_debug_p(LOG, "BundleProtocol::produce copying %zu/%zu bytes from "
00237                     "block type 0x%x at offset %zu",
00238                     tocopy, remainder, iter->type(), offset);
00239         iter->owner()->produce(bundle, &*iter, data, offset, tocopy);
00240         
00241         len    -= tocopy;
00242         data   += tocopy;
00243         offset = 0;
00244 
00245         // if we've copied out the full amount the user asked for,
00246         // we're done. note that we need to check the corner case
00247         // where we completed the block exactly to properly set the
00248         // *last bit
00249         if (len == 0) {
00250             if ((tocopy == remainder) &&
00251                 (iter->flags() & BLOCK_FLAG_LAST_BLOCK))
00252             {
00253                 ASSERT(iter + 1 == blocks->end());
00254                 *last = true;
00255             }
00256             
00257             break;
00258         }
00259 
00260         // we completed the current block, so we're done if this
00261         // is the lat block, even if there's space in the user buffer
00262         ASSERT(tocopy == remainder);
00263         if (iter->flags() & BLOCK_FLAG_LAST_BLOCK) {
00264             ASSERT(iter + 1 == blocks->end());
00265             *last = true;
00266             break;
00267         }
00268         
00269         ++iter;
00270         ASSERT(iter != blocks->end());
00271     }
00272     
00273     log_debug_p(LOG, "BundleProtocol::produce complete: "
00274                 "produced %zu bytes, bundle %s",
00275                 origlen - len, *last ? "complete" : "not complete");
00276     
00277     return origlen - len;
00278 }
00279     
00280 //----------------------------------------------------------------------
00281 int
00282 BundleProtocol::consume(Bundle* bundle,
00283                         u_char* data,
00284                         size_t  len,
00285                         bool*   last)
00286 {
00287     size_t origlen = len;
00288     *last = false;
00289     
00290     // special case for first time we get called, since we need to
00291     // create a BlockInfo struct for the primary block without knowing
00292     // the typecode or the length
00293     if (bundle->recv_blocks_.empty()) {
00294         log_debug_p(LOG, "consume: got first block... "
00295                     "creating primary block info");
00296         bundle->recv_blocks_.push_back(BlockInfo(find_processor(PRIMARY_BLOCK)));
00297     }
00298 
00299     // loop as long as there is data left to process
00300     while (len != 0) {
00301         log_debug_p(LOG, "consume: %zu bytes left to process", len);
00302         BlockInfo* info = &bundle->recv_blocks_.back();
00303 
00304         // if the last received block is complete, create a new one
00305         // and push it onto the vector. at this stage we consume all
00306         // blocks, even if there's no BlockProcessor that understands
00307         // how to parse it
00308         if (info->complete()) {
00309             bundle->recv_blocks_.push_back(BlockInfo(find_processor(*data)));
00310             info = &bundle->recv_blocks_.back();
00311             log_debug_p(LOG, "consume: previous block complete, "
00312                         "created new BlockInfo type 0x%x",
00313                         info->owner()->block_type());
00314         }
00315         
00316         // now we know that the block isn't complete, so we tell it to
00317         // consume a chunk of data
00318         log_debug_p(LOG, "consume: block processor 0x%x type 0x%x incomplete, "
00319                     "calling consume (%zu bytes already buffered)",
00320                     info->owner()->block_type(), info->type(),
00321                     info->contents().len());
00322         
00323         int cc = info->owner()->consume(bundle, info, data, len);
00324         if (cc < 0) {
00325             log_err_p(LOG, "consume: protocol error handling block 0x%x",
00326                       info->type());
00327             return -1;
00328         }
00329         
00330         // decrement the amount that was just handled from the overall
00331         // total. verify that the block was either completed or
00332         // consumed all the data that was passed in.
00333         len  -= cc;
00334         data += cc;
00335 
00336         log_debug_p(LOG, "consume: consumed %u bytes of block type 0x%x (%s)",
00337                     cc, info->type(),
00338                     info->complete() ? "complete" : "not completE");
00339 
00340         if (info->complete()) {
00341             // check if we're done with the bundle
00342             if (info->flags() & BLOCK_FLAG_LAST_BLOCK) {
00343                 *last = true;
00344                 break;
00345             }
00346                 
00347         } else {
00348             ASSERT(len == 0);
00349         }
00350     }
00351     
00352     log_debug_p(LOG, "consume completed, %zu/%zu bytes consumed %s",
00353                 origlen - len, origlen, *last ? "(completed bundle)" : "");
00354     return origlen - len;
00355 }
00356 
00357 //----------------------------------------------------------------------
00358 bool
00359 BundleProtocol::validate(Bundle* bundle,
00360                          status_report_reason_t* reception_reason,
00361                          status_report_reason_t* deletion_reason)
00362 {
00363     int primary_blocks = 0, payload_blocks = 0;
00364     BlockInfoVec* recv_blocks = &bundle->recv_blocks_;
00365  
00366     // a bundle must include at least two blocks (primary and payload)
00367     if (recv_blocks->size() < 2) {
00368         log_err_p(LOG, "bundle fails to contain at least two blocks");
00369         *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00370         return false;
00371     }
00372 
00373     // the first block of a bundle must be a primary block
00374     if (!recv_blocks->front().primary_block()) {
00375         log_err_p(LOG, "bundle fails to contain a primary block");
00376         *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00377         return false;
00378     }
00379 
00380     // validate each individual block
00381     BlockInfoVec::iterator last_block = recv_blocks->end() - 1;
00382     for (BlockInfoVec::iterator iter = recv_blocks->begin();
00383          iter != recv_blocks->end();
00384          ++iter)
00385     {
00386         if (iter->primary_block()) {
00387             primary_blocks++;
00388         }
00389 
00390         if (iter->payload_block()) {
00391             payload_blocks++;
00392         }
00393 
00394         if (!iter->owner()->validate(bundle, &*iter, reception_reason,
00395                                                      deletion_reason)) {
00396             return false;
00397         }
00398 
00399         // a bundle's last block must be flagged as such,
00400         // and all other blocks should not be flagged
00401         if (iter == last_block) {
00402             if (!iter->last_block()) {
00403                 log_err_p(LOG, "bundle's last block not flagged");
00404                 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00405                 return false;
00406             }
00407         } else {
00408             if (iter->last_block()) {
00409                 log_err_p(LOG, "bundle block incorrectly flagged as last");
00410                 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00411                 return false;
00412             }
00413         }
00414     }
00415 
00416     // a bundle must contain one, and only one, primary block
00417     if (primary_blocks != 1) {
00418         log_err_p(LOG, "bundle contains %d primary blocks", primary_blocks);
00419         *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00420         return false;
00421     }
00422            
00423     // a bundle must not contain more than one payload block
00424     if (payload_blocks > 1) {
00425         log_err_p(LOG, "bundle contains %d payload blocks", payload_blocks);
00426         *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00427         return false;
00428     }
00429 
00430     return true;
00431 }
00432 
00433 //----------------------------------------------------------------------
00434 void
00435 BundleProtocol::set_timestamp(u_char* ts, const BundleTimestamp* tv)
00436 {
00437     u_int32_t tmp;
00438 
00439     tmp = htonl(tv->seconds_);
00440     memcpy(ts, &tmp, sizeof(u_int32_t));
00441     ts += sizeof(u_int32_t);
00442 
00443     tmp = htonl(tv->seqno_);
00444     memcpy(ts, &tmp, sizeof(u_int32_t));
00445 }
00446 
00447 //----------------------------------------------------------------------
00448 void
00449 BundleProtocol::get_timestamp(BundleTimestamp* tv, const u_char* ts)
00450 {
00451     u_int32_t tmp;
00452 
00453     memcpy(&tmp, ts, sizeof(u_int32_t));
00454     tv->seconds_  = ntohl(tmp);
00455     ts += sizeof(u_int32_t);
00456     
00457     memcpy(&tmp, ts, sizeof(u_int32_t));
00458     tv->seqno_ = ntohl(tmp);
00459 }
00460 
00461 //----------------------------------------------------------------------
00462 bool
00463 BundleProtocol::get_admin_type(const Bundle* bundle, admin_record_type_t* type)
00464 {
00465     if (! bundle->is_admin_) {
00466         return false;
00467     }
00468 
00469     u_char buf[16];
00470     const u_char* bp = bundle->payload_.read_data(0, sizeof(buf), buf);
00471 
00472     switch (bp[0] >> 4)
00473     {
00474 #define CASE(_what) case _what: *type = _what; return true;
00475 
00476         CASE(ADMIN_STATUS_REPORT);
00477         CASE(ADMIN_CUSTODY_SIGNAL);
00478         CASE(ADMIN_ANNOUNCE);
00479 
00480 #undef  CASE
00481     default:
00482         return false; // unknown type
00483     }
00484 
00485     NOTREACHED;
00486 }
00487 
00488 } // namespace dtn

Generated on Sat Sep 8 08:36:16 2007 for DTN Reference Implementation by  doxygen 1.5.3