00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <oasys/debug/Log.h>
00022
00023 #include "BlockProcessor.h"
00024 #include "BlockInfo.h"
00025 #include "Bundle.h"
00026 #include "SDNV.h"
00027
00028 namespace dtn {
00029
00030 static const char* log = "/dtn/bundle/protocol";
00031
00032
00033 BlockProcessor::BlockProcessor(int block_type)
00034 : block_type_(block_type)
00035 {
00036
00037 (void)log;
00038 }
00039
00040
00041 BlockProcessor::~BlockProcessor()
00042 {
00043 }
00044
00045
00046 int
00047 BlockProcessor::consume_preamble(BlockInfoVec* recv_blocks,
00048 BlockInfo* block,
00049 u_char* buf,
00050 size_t len,
00051 u_int64_t* flagp)
00052 {
00053 static const char* log = "/dtn/bundle/protocol";
00054 int sdnv_len;
00055 ASSERT(! block->complete());
00056 ASSERT(block->data_offset() == 0);
00057
00058
00059
00060
00061
00062
00063
00064
00065 if ( block->contents().nfree() == 0 ) {
00066 block->writable_contents()->reserve(block->contents().len() + 64);
00067 }
00068
00069 size_t max_preamble = block->contents().buf_len();
00070 size_t prev_consumed = block->contents().len();
00071 size_t tocopy = std::min(len, max_preamble - prev_consumed);
00072
00073 ASSERT(max_preamble > prev_consumed);
00074 BlockInfo::DataBuffer* contents = block->writable_contents();
00075 ASSERT(contents->nfree() >= tocopy);
00076 memcpy(contents->end(), buf, tocopy);
00077 contents->set_len(contents->len() + tocopy);
00078
00079
00080
00081 if (contents->len() <= BundleProtocol::PREAMBLE_FIXED_LENGTH) {
00082 ASSERT(tocopy == len);
00083 return len;
00084 }
00085
00086 size_t buf_offset = BundleProtocol::PREAMBLE_FIXED_LENGTH;
00087 u_int64_t flags;
00088
00089
00090
00091
00092 sdnv_len = SDNV::decode(contents->buf() + buf_offset,
00093 contents->len() - buf_offset,
00094 &flags);
00095 if (sdnv_len == -1) {
00096 ASSERT(tocopy == len);
00097 return len;
00098 }
00099
00100 if (flagp != NULL)
00101 *flagp = flags;
00102
00103 buf_offset += sdnv_len;
00104
00105
00106 Dictionary* dict = recv_blocks->dict();
00107
00108
00109
00110
00111
00112
00113
00114
00115 u_int64_t eid_ref_count = 0LLU;
00116 u_int64_t scheme_offset;
00117 u_int64_t ssp_offset;
00118
00119 ASSERT(block->eid_list().empty());
00120 EndpointIDVector eid_list;
00121
00122 if ( flags & BundleProtocol::BLOCK_FLAG_EID_REFS ) {
00123 sdnv_len = SDNV::decode(contents->buf() + buf_offset,
00124 contents->len() - buf_offset,
00125 &eid_ref_count);
00126 if (sdnv_len == -1) {
00127 ASSERT(tocopy == len);
00128 return len;
00129 }
00130
00131 buf_offset += sdnv_len;
00132
00133 for ( u_int32_t i = 0; i < eid_ref_count; ++i ) {
00134
00135 sdnv_len = SDNV::decode(contents->buf() + buf_offset,
00136 contents->len() - buf_offset,
00137 &scheme_offset);
00138 if (sdnv_len == -1) {
00139 ASSERT(tocopy == len);
00140 return len;
00141 }
00142 buf_offset += sdnv_len;
00143
00144 sdnv_len = SDNV::decode(contents->buf() + buf_offset,
00145 contents->len() - buf_offset,
00146 &ssp_offset);
00147 if (sdnv_len == -1) {
00148 ASSERT(tocopy == len);
00149 return len;
00150 }
00151 buf_offset += sdnv_len;
00152
00153 EndpointID eid;
00154 dict->extract_eid(&eid, scheme_offset, ssp_offset);
00155 eid_list.push_back(eid);
00156 }
00157 }
00158
00159
00160
00161
00162 u_int64_t block_len;
00163 sdnv_len = SDNV::decode(contents->buf() + buf_offset,
00164 contents->len() - buf_offset,
00165 &block_len);
00166 if (sdnv_len == -1) {
00167 ASSERT(tocopy == len);
00168 return len;
00169 }
00170
00171 if (block_len > 0xFFFFFFFFLL) {
00172
00173 log_err_p(log, "overflow in SDNV value for block type 0x%x",
00174 *contents->buf());
00175 return -1;
00176 }
00177
00178 buf_offset += sdnv_len;
00179
00180
00181
00182
00183
00184 block->set_data_length(static_cast<u_int32_t>(block_len));
00185 block->set_data_offset(buf_offset);
00186 contents->set_len(buf_offset);
00187
00188 block->set_eid_list(eid_list);
00189
00190 log_debug_p(log, "BlockProcessor type 0x%x "
00191 "consumed preamble %zu/%u for block: "
00192 "data_offset %u data_length %u eid_ref_count %llu",
00193 block_type(), buf_offset + prev_consumed,
00194 block->full_length(),
00195 block->data_offset(), block->data_length(),
00196 U64FMT(eid_ref_count));
00197
00198
00199
00200 ASSERT(buf_offset > prev_consumed);
00201 return buf_offset - prev_consumed;
00202 }
00203
00204
00205 void
00206 BlockProcessor::generate_preamble(BlockInfoVec* xmit_blocks,
00207 BlockInfo* block,
00208 u_int8_t type,
00209 u_int64_t flags,
00210 u_int64_t data_length)
00211 {
00212 char work[1000];
00213 char* ptr = work;
00214 size_t len = sizeof(work);
00215 int32_t sdnv_len;
00216 u_int32_t scheme_offset;
00217 u_int32_t ssp_offset;
00218
00219
00220 Dictionary* dict = xmit_blocks->dict();
00221
00222
00223 u_int32_t eid_count = block->eid_list().size();
00224 if ( eid_count > 0 ) {
00225 flags |= BundleProtocol::BLOCK_FLAG_EID_REFS;
00226 sdnv_len = SDNV::encode(eid_count, ptr, len);
00227 ptr += sdnv_len;
00228 len -= sdnv_len;
00229 EndpointIDVector::const_iterator iter = block->eid_list().begin();
00230 for ( ; iter < block->eid_list().end(); ++iter ) {
00231 dict->add_eid(*iter);
00232 dict->get_offsets(*iter, &scheme_offset, &ssp_offset);
00233 sdnv_len = SDNV::encode(scheme_offset, ptr, len);
00234 ptr += sdnv_len;
00235 len -= sdnv_len;
00236 sdnv_len = SDNV::encode(ssp_offset, ptr, len);
00237 ptr += sdnv_len;
00238 len -= sdnv_len;
00239 }
00240 }
00241
00242 size_t eid_field_len = ptr - work;
00243
00244 size_t flag_sdnv_len = SDNV::encoding_len(flags);
00245 size_t length_sdnv_len = SDNV::encoding_len(data_length);
00246 ASSERT(block->contents().len() == 0);
00247 ASSERT(block->contents().buf_len() >= BundleProtocol::PREAMBLE_FIXED_LENGTH
00248 + flag_sdnv_len + eid_field_len + length_sdnv_len);
00249
00250 u_char* bp = block->writable_contents()->buf();
00251 len = block->contents().buf_len();
00252
00253 *bp = type;
00254 bp += BundleProtocol::PREAMBLE_FIXED_LENGTH;
00255 len -= BundleProtocol::PREAMBLE_FIXED_LENGTH;
00256
00257 SDNV::encode(flags, bp, flag_sdnv_len);
00258 bp += flag_sdnv_len;
00259 len -= flag_sdnv_len;
00260
00261 memcpy(bp, work, eid_field_len);
00262 bp += eid_field_len;
00263 len -= eid_field_len;
00264
00265 SDNV::encode(data_length, bp, length_sdnv_len);
00266 bp += length_sdnv_len;
00267 len -= length_sdnv_len;
00268
00269 block->set_data_length(data_length);
00270 u_int32_t offset = BundleProtocol::PREAMBLE_FIXED_LENGTH +
00271 flag_sdnv_len + eid_field_len + length_sdnv_len;
00272 block->set_data_offset(offset);
00273 block->writable_contents()->set_len(offset);
00274
00275 log_debug_p(log, "BlockProcessor type 0x%x "
00276 "generated preamble for block type 0x%x flags 0x%llx: "
00277 "data_offset %u data_length %u eid_count %u",
00278 block_type(), block->type(), U64FMT(block->flags()),
00279 block->data_offset(), block->data_length(), eid_count);
00280 }
00281
00282
00283 int
00284 BlockProcessor::consume(Bundle* bundle,
00285 BlockInfo* block,
00286 u_char* buf,
00287 size_t len)
00288 {
00289 (void)bundle;
00290
00291 static const char* log = "/dtn/bundle/protocol";
00292 (void)log;
00293
00294 size_t consumed = 0;
00295
00296 ASSERT(! block->complete());
00297 BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
00298
00299
00300
00301
00302 if (block->data_offset() == 0) {
00303 int cc = consume_preamble(recv_blocks, block, buf, len);
00304 if (cc == -1) {
00305 return -1;
00306 }
00307
00308 buf += cc;
00309 len -= cc;
00310
00311 consumed += cc;
00312 }
00313
00314
00315
00316 if (block->data_offset() == 0) {
00317 ASSERT(len == 0);
00318 }
00319
00320
00321
00322 if (block->data_offset() != 0 && block->data_length() == 0) {
00323 block->set_complete(true);
00324 }
00325
00326
00327 if (len == 0)
00328 return consumed;
00329
00330
00331
00332 ASSERT(block->data_length() == 0 ||
00333 block->full_length() > block->contents().len());
00334
00335
00336 block->writable_contents()->reserve(block->full_length());
00337
00338 size_t rcvd = block->contents().len();
00339 size_t remainder = block->full_length() - rcvd;
00340 size_t tocopy;
00341
00342 if (len >= remainder) {
00343 block->set_complete(true);
00344 tocopy = remainder;
00345 } else {
00346 tocopy = len;
00347 }
00348
00349
00350 memcpy(block->writable_contents()->end(), buf, tocopy);
00351 block->writable_contents()->set_len(rcvd + tocopy);
00352 len -= tocopy;
00353 consumed += tocopy;
00354
00355 log_debug_p(log, "BlockProcessor type 0x%x "
00356 "consumed %zu/%u for block type 0x%x (%s)",
00357 block_type(), consumed, block->full_length(), block->type(),
00358 block->complete() ? "complete" : "not complete");
00359
00360 return consumed;
00361 }
00362
00363
00364 bool
00365 BlockProcessor::validate(const Bundle* bundle,
00366 BlockInfoVec* block_list,
00367 BlockInfo* block,
00368 status_report_reason_t* reception_reason,
00369 status_report_reason_t* deletion_reason)
00370 {
00371 static const char * log = "/dtn/bundle/protocol";
00372 (void)block_list;
00373 (void)reception_reason;
00374
00375
00376
00377
00378 if (bundle->is_admin() &&
00379 block->type() != BundleProtocol::PRIMARY_BLOCK &&
00380 block->flags() & BundleProtocol::BLOCK_FLAG_REPORT_ONERROR) {
00381 log_err_p(log, "invalid block flag 0x%x for received admin bundle",
00382 BundleProtocol::BLOCK_FLAG_REPORT_ONERROR);
00383 *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
00384 return false;
00385 }
00386
00387 return true;
00388 }
00389
00390
00391 int
00392 BlockProcessor::reload_post_process(Bundle* bundle,
00393 BlockInfoVec* block_list,
00394 BlockInfo* block)
00395 {
00396 (void)bundle;
00397 (void)block_list;
00398 (void)block;
00399
00400 block->set_reloaded(false);
00401 return 0;
00402 }
00403
00404
00405 int
00406 BlockProcessor::prepare(const Bundle* bundle,
00407 BlockInfoVec* xmit_blocks,
00408 const BlockInfo* source,
00409 const LinkRef& link,
00410 list_owner_t list)
00411 {
00412 (void)bundle;
00413 (void)link;
00414 (void)list;
00415
00416
00417
00418
00419
00420
00421 if (list == BlockInfo::LIST_RECEIVED) {
00422 xmit_blocks->append_block(this, source);
00423 }
00424 else {
00425 ASSERT((*xmit_blocks)[0].type() == BundleProtocol::PRIMARY_BLOCK);
00426 xmit_blocks->insert(xmit_blocks->begin() + 1, BlockInfo(this, source));
00427 }
00428 return BP_SUCCESS;
00429 }
00430
00431
00432 int
00433 BlockProcessor::finalize(const Bundle* bundle,
00434 BlockInfoVec* xmit_blocks,
00435 BlockInfo* block,
00436 const LinkRef& link)
00437 {
00438 (void)xmit_blocks;
00439 (void)link;
00440
00441 if (bundle->is_admin() && block->type() != BundleProtocol::PRIMARY_BLOCK) {
00442 ASSERT((block->flags() &
00443 BundleProtocol::BLOCK_FLAG_REPORT_ONERROR) == 0);
00444 }
00445 return BP_SUCCESS;
00446 }
00447
00448
00449 void
00450 BlockProcessor::process(process_func* func,
00451 const Bundle* bundle,
00452 const BlockInfo* caller_block,
00453 const BlockInfo* target_block,
00454 size_t offset,
00455 size_t len,
00456 OpaqueContext* context)
00457 {
00458 u_char* buf;
00459
00460 ASSERT(offset < target_block->contents().len());
00461 ASSERT(target_block->contents().len() >= offset + len);
00462
00463
00464 buf = target_block->contents().buf() + offset;
00465
00466
00467 (*func)(bundle, caller_block, target_block, buf, len, context);
00468 }
00469
00470
00471 bool
00472 BlockProcessor::mutate(mutate_func* func,
00473 Bundle* bundle,
00474 const BlockInfo* caller_block,
00475 BlockInfo* target_block,
00476 size_t offset,
00477 size_t len,
00478 OpaqueContext* context)
00479 {
00480 u_char* buf;
00481
00482 ASSERT(offset < target_block->contents().len());
00483 ASSERT(target_block->contents().len() >= offset + len);
00484
00485
00486 buf = target_block->contents().buf() + offset;
00487
00488
00489 return (*func)(bundle, caller_block, target_block, buf, len, context);
00490
00491
00492 }
00493
00494
00495 void
00496 BlockProcessor::produce(const Bundle* bundle,
00497 const BlockInfo* block,
00498 u_char* buf,
00499 size_t offset,
00500 size_t len)
00501 {
00502 (void)bundle;
00503 ASSERT(offset < block->contents().len());
00504 ASSERT(block->contents().len() >= offset + len);
00505 memcpy(buf, block->contents().buf() + offset, len);
00506 }
00507
00508
00509 void
00510 BlockProcessor::init_block(BlockInfo* block,
00511 BlockInfoVec* block_list,
00512 u_int8_t type,
00513 u_int8_t flags,
00514 const u_char* bp,
00515 size_t len)
00516 {
00517 ASSERT(block->owner() != NULL);
00518 generate_preamble(block_list, block, type, flags, len);
00519 ASSERT(block->data_offset() != 0);
00520 block->writable_contents()->reserve(block->full_length());
00521 block->writable_contents()->set_len(block->full_length());
00522 memcpy(block->writable_contents()->buf() + block->data_offset(),
00523 bp, len);
00524 }
00525
00526 }