00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #include "PayloadBlockProcessor.h"
00018 #include "Bundle.h"
00019 #include "BundleProtocol.h"
00020
00021 namespace dtn {
00022
00023
00024 PayloadBlockProcessor::PayloadBlockProcessor()
00025 : BlockProcessor(BundleProtocol::PAYLOAD_BLOCK)
00026 {
00027 }
00028
00029
00030 int
00031 PayloadBlockProcessor::consume(Bundle* bundle,
00032 BlockInfo* block,
00033 u_char* buf,
00034 size_t len)
00035 {
00036 static const char* log = "/dtn/bundle/protocol";
00037 (void)log;
00038
00039 size_t consumed = 0;
00040 if (block->data_offset() == 0) {
00041 int cc = consume_preamble(block, buf, len);
00042 if (cc == -1) {
00043 return -1;
00044 }
00045
00046 buf += cc;
00047 len -= cc;
00048
00049 consumed += cc;
00050
00051 ASSERT(bundle->payload_.length() == 0);
00052 }
00053
00054
00055
00056 if (block->data_offset() == 0) {
00057 ASSERT(len == 0);
00058 }
00059
00060
00061
00062 if (bundle->payload_.location() == BundlePayload::NODATA) {
00063 block->set_complete(true);
00064 return consumed;
00065 }
00066
00067
00068
00069
00070 if (len == 0) {
00071 if (block->data_offset() != 0 && block->data_length() == 0) {
00072 block->set_complete(true);
00073 }
00074 return consumed;
00075 }
00076
00077
00078
00079 ASSERT(block->contents().len() == block->data_offset());
00080
00081
00082
00083 ASSERT(block->data_length() > bundle->payload_.length());
00084
00085 size_t rcvd = bundle->payload_.length();
00086 size_t remainder = block->data_length() - rcvd;
00087 size_t tocopy;
00088
00089 if (len >= remainder) {
00090 block->set_complete(true);
00091 tocopy = remainder;
00092 } else {
00093 tocopy = len;
00094 }
00095
00096 bundle->payload_.set_length(rcvd + tocopy);
00097 bundle->payload_.write_data(buf, rcvd, tocopy);
00098
00099 consumed += tocopy;
00100
00101 log_debug_p(log, "PayloadBlockProcessor consumed %zu/%u (%s)",
00102 consumed, block->full_length(),
00103 block->complete() ? "complete" : "not complete");
00104
00105 return consumed;
00106 }
00107
00108
00109 void
00110 PayloadBlockProcessor::generate(const Bundle* bundle,
00111 Link* link,
00112 BlockInfo* block,
00113 bool last)
00114 {
00115 (void)link;
00116
00117
00118
00119 generate_preamble(block,
00120 BundleProtocol::PAYLOAD_BLOCK,
00121 last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0,
00122 bundle->payload_.length());
00123 }
00124
00125
00126 void
00127 PayloadBlockProcessor::produce(const Bundle* bundle,
00128 const BlockInfo* block,
00129 u_char* buf,
00130 size_t offset,
00131 size_t len)
00132 {
00133
00134 if (offset < block->data_offset()) {
00135 size_t tocopy = std::min(len, block->data_offset() - offset);
00136 memcpy(buf, block->contents().buf() + offset, tocopy);
00137 buf += tocopy;
00138 offset += tocopy;
00139 len -= tocopy;
00140 }
00141
00142 if (len == 0)
00143 return;
00144
00145
00146 size_t payload_offset = offset - block->data_offset();
00147
00148 size_t tocopy = std::min(len, bundle->payload_.length() - payload_offset);
00149 bundle->payload_.read_data(payload_offset, tocopy, buf,
00150 BundlePayload::FORCE_COPY);
00151 }
00152
00153 }