00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <oasys/util/OptParser.h>
00040 #include "StreamConvergenceLayer.h"
00041 #include "bundling/AnnounceBundle.h"
00042 #include "bundling/BundleDaemon.h"
00043 #include "bundling/SDNV.h"
00044 #include "bundling/TempBundle.h"
00045 #include "contacts/ContactManager.h"
00046
00047 namespace dtn {
00048
00049
00050 StreamConvergenceLayer::StreamLinkParams::StreamLinkParams(bool init_defaults)
00051 : LinkParams(init_defaults),
00052 segment_ack_enabled_(true),
00053 keepalive_interval_(10),
00054 segment_length_(4096)
00055 {
00056 }
00057
00058
00059 StreamConvergenceLayer::StreamConvergenceLayer(const char* logpath,
00060 const char* cl_name,
00061 u_int8_t cl_version)
00062 : ConnectionConvergenceLayer(logpath, cl_name),
00063 cl_version_(cl_version)
00064 {
00065 }
00066
00067
00068 bool
00069 StreamConvergenceLayer::parse_link_params(LinkParams* lparams,
00070 int argc, const char** argv,
00071 const char** invalidp)
00072 {
00073
00074
00075 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00076 ASSERT(params != NULL);
00077
00078 oasys::OptParser p;
00079
00080 p.addopt(new oasys::BoolOpt("segment_ack_enabled",
00081 ¶ms->segment_ack_enabled_));
00082
00083 p.addopt(new oasys::UIntOpt("keepalive_interval",
00084 ¶ms->keepalive_interval_));
00085
00086 p.addopt(new oasys::UIntOpt("segment_length",
00087 ¶ms->segment_length_));
00088
00089 int count = p.parse_and_shift(argc, argv, invalidp);
00090 if (count == -1) {
00091 return false;
00092 }
00093 argc -= count;
00094
00095 return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
00096 invalidp);
00097 }
00098
00099
00100 bool
00101 StreamConvergenceLayer::finish_init_link(Link* link, LinkParams* lparams)
00102 {
00103 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00104 ASSERT(params != NULL);
00105
00106
00107 if (params->segment_ack_enabled_) {
00108 link->set_reliable(true);
00109 }
00110
00111 return true;
00112 }
00113
00114
00115 void
00116 StreamConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00117 {
00118 ConnectionConvergenceLayer::dump_link(link, buf);
00119
00120 StreamLinkParams* params =
00121 dynamic_cast<StreamLinkParams*>(link->cl_info());
00122 ASSERT(params != NULL);
00123
00124 buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
00125 buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
00126 buf->appendf("segment_length: %u\n", params->segment_length_);
00127 }
00128
00129
00130 StreamConvergenceLayer::Connection::Connection(const char* classname,
00131 const char* logpath,
00132 ConvergenceLayer* cl,
00133 StreamLinkParams* params,
00134 bool active_connector)
00135 : CLConnection(classname, logpath, cl, params, active_connector),
00136 current_inflight_(NULL),
00137 send_segment_todo_(0),
00138 recv_segment_todo_(0)
00139 {
00140 }
00141
00142
00143 void
00144 StreamConvergenceLayer::Connection::initiate_contact()
00145 {
00146 log_debug("initiate_contact called");
00147
00148
00149 ContactHeader contacthdr;
00150 contacthdr.magic = htonl(MAGIC);
00151 contacthdr.version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00152
00153 contacthdr.flags = 0;
00154
00155 StreamLinkParams* params = stream_lparams();
00156
00157 if (params->segment_ack_enabled_)
00158 contacthdr.flags |= SEGMENT_ACK_ENABLED;
00159
00160 if (params->reactive_frag_enabled_)
00161 contacthdr.flags |= REACTIVE_FRAG_ENABLED;
00162
00163 contacthdr.keepalive_interval = htons(params->keepalive_interval_);
00164
00165
00166 ASSERT(sendbuf_.fullbytes() == 0);
00167 if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
00168 log_warn("send buffer too short: %zu < needed %zu",
00169 sendbuf_.tailbytes(), sizeof(ContactHeader));
00170 sendbuf_.reserve(sizeof(ContactHeader));
00171 }
00172
00173 memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
00174 sendbuf_.fill(sizeof(ContactHeader));
00175
00176
00177
00178 TempBundle announce;
00179 BundleDaemon* bd = BundleDaemon::instance();
00180 AnnounceBundle::create_announce_bundle(&announce, bd->local_eid());
00181 size_t announce_len = BundleProtocol::formatted_length(&announce);
00182 size_t sdnv_len = SDNV::encoding_len(announce_len);
00183
00184 if (sendbuf_.tailbytes() < sdnv_len + announce_len) {
00185 log_warn("send buffer too short: %zu < needed %zu",
00186 sendbuf_.tailbytes(), sdnv_len + announce_len);
00187 sendbuf_.reserve(sdnv_len + announce_len);
00188 }
00189
00190 sdnv_len = SDNV::encode(announce_len,
00191 (u_char*)sendbuf_.end(),
00192 sendbuf_.tailbytes());
00193 sendbuf_.fill(sdnv_len);
00194
00195 int ret = BundleProtocol::format_bundle(&announce,
00196 (u_char*)sendbuf_.end(),
00197 sendbuf_.tailbytes());
00198
00199 ASSERTF(ret > 0, "BundleProtocol::formatted_length disagreed with "
00200 "BundleProtocol::format_bundle");
00201 sendbuf_.fill(ret);
00202
00203
00204 note_data_sent();
00205 send_data();
00206 }
00207
00208
00209 void
00210 StreamConvergenceLayer::Connection::handle_contact_initiation()
00211 {
00212 ASSERT(! contact_up_);
00213
00214
00215
00216
00217 size_t len_needed = sizeof(ContactHeader);
00218 if (recvbuf_.fullbytes() < len_needed) {
00219 tooshort:
00220 log_debug("handle_contact_initiation: not enough data received "
00221 "(need > %zu, got %zu)",
00222 len_needed, recvbuf_.fullbytes());
00223 return;
00224 }
00225
00226
00227
00228
00229 u_int64_t announce_len;
00230 int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
00231 sizeof(ContactHeader),
00232 recvbuf_.fullbytes() -
00233 sizeof(ContactHeader),
00234 &announce_len);
00235 if (sdnv_len < 0) {
00236 goto tooshort;
00237 }
00238
00239 len_needed = sizeof(ContactHeader) + sdnv_len + announce_len;
00240 if (recvbuf_.fullbytes() < len_needed) {
00241 goto tooshort;
00242 }
00243
00244
00245
00246
00247 ContactHeader contacthdr;
00248 memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
00249
00250 contacthdr.magic = ntohl(contacthdr.magic);
00251 contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
00252
00253 recvbuf_.consume(sizeof(ContactHeader));
00254
00255
00256
00257
00258 if (contacthdr.magic != MAGIC) {
00259 log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
00260 "-- disconnecting.", contacthdr.magic, MAGIC);
00261 break_contact(ContactEvent::BROKEN);
00262 return;
00263 }
00264
00265 u_int8_t cl_version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00266 if (contacthdr.version != cl_version) {
00267 log_warn("remote sent version %d, expected version %d "
00268 "-- disconnecting.", contacthdr.version, cl_version);
00269 break_contact(ContactEvent::BROKEN);
00270 return;
00271 }
00272
00273
00274
00275
00276 StreamLinkParams* params = stream_lparams();
00277
00278 params->keepalive_interval_ =
00279 std::min(params->keepalive_interval_,
00280 (u_int)contacthdr.keepalive_interval);
00281
00282 params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
00283 (contacthdr.flags & SEGMENT_ACK_ENABLED);
00284
00285 params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
00286 (contacthdr.flags & REACTIVE_FRAG_ENABLED);
00287
00288
00289
00290
00291
00292 recvbuf_.consume(sdnv_len);
00293
00294
00295
00296
00297
00298
00299 TempBundle announce;
00300 int ret = BundleProtocol::parse_bundle(&announce,
00301 (u_char*)recvbuf_.start(),
00302 recvbuf_.fullbytes());
00303
00304 if (ret != (int)announce_len) {
00305 if (ret < 0) {
00306 log_err("protocol error: announce bundle length given as %llu, "
00307 "parser requires more bytes", announce_len);
00308 } else {
00309 log_err("protocol error: announce bundle length given as %llu, "
00310 "parser requires only %u", announce_len, ret);
00311 }
00312 break_contact(ContactEvent::BROKEN);
00313 return;
00314 }
00315
00316 handle_announce_bundle(&announce);
00317 recvbuf_.consume(announce_len);
00318
00319
00320
00321
00322
00323
00324 ::gettimeofday(&data_rcvd_, 0);
00325 ::gettimeofday(&data_sent_, 0);
00326 ::gettimeofday(&keepalive_sent_, 0);
00327
00328
00329
00330
00331 contact_up();
00332 }
00333
00334
00335
00336 void
00337 StreamConvergenceLayer::Connection::handle_send_bundle(Bundle* bundle)
00338 {
00339
00340
00341 InFlightBundle* inflight = new InFlightBundle(bundle);
00342
00343 inflight->header_block_length_ =
00344 BundleProtocol::header_block_length(bundle);
00345
00346 inflight->tail_block_length_ =
00347 BundleProtocol::tail_block_length(bundle);
00348
00349 inflight->formatted_length_ =
00350 inflight->header_block_length_ +
00351 inflight->bundle_->payload_.length() +
00352 inflight->tail_block_length_;
00353
00354 inflight_.push_back(inflight);
00355 }
00356
00357
00358 bool
00359 StreamConvergenceLayer::Connection::send_pending_data()
00360 {
00361
00362
00363 if (sendbuf_.tailbytes() == 0) {
00364 return false;
00365 }
00366
00367
00368
00369
00370
00371 if (send_segment_todo_ != 0) {
00372 ASSERT(current_inflight_ != NULL);
00373 send_data_todo(current_inflight_);
00374 }
00375
00376
00377 if (contact_broken_ || (send_segment_todo_ != 0)) {
00378 return false;
00379 }
00380
00381
00382
00383
00384
00385 bool sent_ack = send_pending_acks();
00386
00387
00388
00389 bool sent_data;
00390 if (current_inflight_ == NULL) {
00391 sent_data = start_next_bundle();
00392 } else {
00393
00394 sent_data = send_next_segment(current_inflight_);
00395 }
00396
00397 return sent_ack || sent_data;
00398 }
00399
00400
00401 bool
00402 StreamConvergenceLayer::Connection::send_pending_acks()
00403 {
00404 if (contact_broken_ || incoming_.empty()) {
00405 return false;
00406 }
00407
00408 IncomingBundle* incoming = incoming_.front();
00409
00410 if (incoming->ack_data_.empty()) {
00411 return false;
00412 }
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 DataBitmap::iterator start = incoming->ack_data_.begin();
00428 start.skip_contiguous();
00429 DataBitmap::iterator end = start + 1;
00430
00431 size_t received_bytes = incoming->rcvd_data_.last() + 1;
00432
00433 int sent_start = -1;
00434 size_t last_ack = 0;
00435 while (end != incoming->ack_data_.end()) {
00436 size_t ack_len = *end + 1;
00437 size_t segment_len = ack_len - *start;
00438 (void)segment_len;
00439
00440 if (ack_len > received_bytes) {
00441 log_debug("send_pending_acks: "
00442 "waiting to send ack length %zu for %zu byte segment "
00443 "since only received %zu",
00444 ack_len, segment_len, received_bytes);
00445 break;
00446 }
00447
00448
00449 size_t encoding_len = 1 + SDNV::encoding_len(ack_len);
00450 if (encoding_len <= sendbuf_.tailbytes()) {
00451 log_debug("send_pending_acks: "
00452 "sending ack length %zu for %zu byte segment "
00453 "[range %zu..%zu]",
00454 ack_len, segment_len, *start, *end);
00455
00456 if (sent_start == -1) {
00457 sent_start = *start;
00458 }
00459
00460 ASSERT(last_ack <= ack_len);
00461 last_ack = ack_len;
00462
00463 *sendbuf_.end() = ACK_SEGMENT;
00464 int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
00465 sendbuf_.tailbytes() - 1);
00466 ASSERT(encoding_len = len + 1);
00467 sendbuf_.fill(encoding_len);
00468
00469 } else {
00470 log_debug("send_pending_acks: "
00471 "no space for ack in buffer (need %zu, have %zu)",
00472 encoding_len, sendbuf_.tailbytes());
00473 break;
00474 }
00475
00476
00477 start = end + 1;
00478 if (start == incoming->ack_data_.end()) {
00479 break;
00480 }
00481
00482 end = start + 1;
00483
00484 log_debug("send_pending_acks: "
00485 "found another gap (%zu, %zu)", *start, *end);
00486 }
00487
00488 if (last_ack != 0) {
00489 incoming->ack_data_.set(0, last_ack);
00490
00491 log_debug("send_pending_acks: "
00492 "updated ack_data for ack length %zu: *%p ",
00493 last_ack, &incoming->ack_data_);
00494
00495 send_data();
00496 note_data_sent();
00497 }
00498
00499
00500
00501
00502 if ((incoming->total_length_ != 0) &&
00503 (incoming->ack_data_.num_contiguous() == incoming->total_length_))
00504 {
00505 log_debug("send_pending_acks: acked all %zu bytes of bundle %d",
00506 incoming->total_length_, incoming->bundle_->bundleid_);
00507
00508 incoming_.pop_front();
00509 delete incoming;
00510 }
00511 else
00512 {
00513 log_debug("send_pending_acks: "
00514 "still need to send acks -- total_rcvd %zu, acked_range %zu",
00515 incoming->total_length_,
00516 incoming->ack_data_.num_contiguous());
00517 }
00518
00519
00520 return (last_ack != 0);
00521 }
00522
00523
00524
00525 bool
00526 StreamConvergenceLayer::Connection::start_next_bundle()
00527 {
00528 ASSERT(current_inflight_ == NULL);
00529
00530
00531
00532 InFlightList::iterator iter;
00533 for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
00534 InFlightBundle* inflight = *iter;
00535
00536 if (contact_broken_)
00537 return false;
00538
00539
00540 if (inflight->sent_data_.num_contiguous() ==
00541 inflight->formatted_length_)
00542 {
00543 log_debug("start_next_bundle: "
00544 "transmission of bundle %d already complete, skipping",
00545 inflight->bundle_->bundleid_);
00546 continue;
00547 }
00548
00549
00550
00551 ASSERT(inflight->sent_data_.empty());
00552 current_inflight_ = inflight;
00553 break;
00554 }
00555
00556
00557
00558 if (current_inflight_ == NULL) {
00559 return false;
00560 }
00561
00562 Bundle* bundle = current_inflight_->bundle_.object();
00563 StreamLinkParams* params = stream_lparams();
00564
00565
00566
00567
00568
00569 size_t header_len = BundleProtocol::header_block_length(bundle);
00570 size_t segment_len = std::min((size_t)params->segment_length_,
00571 current_inflight_->formatted_length_);
00572 if (segment_len < header_len) {
00573 log_warn("configured segment_len %zu smaller than header segment "
00574 "length %zu... upping segment length", segment_len, header_len);
00575 segment_len = header_len;
00576 }
00577
00578 size_t sdnv_len = SDNV::encoding_len(segment_len);
00579
00580
00581
00582
00583
00584 size_t min_buffer_len = 1 + 1 + sdnv_len + header_len;
00585 if (min_buffer_len > sendbuf_.tailbytes()) {
00586 if (min_buffer_len > sendbuf_.size()) {
00587 log_warn("start_bundle: "
00588 "not enough space for headers [need %zu, have %zu] "
00589 "(expanding buffer)",
00590 min_buffer_len, sendbuf_.size());
00591 sendbuf_.reserve(min_buffer_len);
00592 ASSERT(min_buffer_len <= sendbuf_.tailbytes());
00593
00594 } else {
00595 log_debug("start_bundle: "
00596 "not enough space for headers [need %zu, have %zu] "
00597 "(waiting for buffer to drain)",
00598 min_buffer_len, sendbuf_.tailbytes());
00599
00600 current_inflight_ = NULL;
00601 return false;
00602 }
00603 }
00604
00605 log_debug("send_start_bundle: sending "
00606 "START_BUNDLE for bundle %d: initial segment len %zu [hdrs %zu]",
00607 current_inflight_->bundle_->bundleid_, segment_len, header_len);
00608
00609 u_char* bp = (u_char*)sendbuf_.end();
00610 size_t len = sendbuf_.tailbytes();
00611
00612 *bp++ = START_BUNDLE;
00613 len--;
00614
00615 *bp++ = DATA_SEGMENT;
00616 len--;
00617
00618 int cc = SDNV::encode(segment_len, bp, len);
00619 ASSERT(cc == (int)sdnv_len);
00620
00621 bp += sdnv_len;
00622 len -= sdnv_len;
00623
00624 cc = BundleProtocol::format_header_blocks(bundle, bp, len);
00625 ASSERT(cc == (int)header_len);
00626
00627 sendbuf_.fill(1 + 1 + sdnv_len + header_len);
00628 current_inflight_->sent_data_.set(0, header_len);
00629
00630
00631
00632
00633
00634
00635 send_segment_todo_ = segment_len - header_len;
00636
00637 if (send_segment_todo_ == 0) {
00638 note_data_sent();
00639 send_data();
00640 return true;
00641 }
00642
00643 return send_data_todo(current_inflight_);
00644 }
00645
00646
00647 bool
00648 StreamConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
00649 {
00650 if (sendbuf_.tailbytes() == 0) {
00651 return false;
00652 }
00653
00654 ASSERT(send_segment_todo_ == 0);
00655
00656 StreamLinkParams* params = stream_lparams();
00657 Bundle* bundle = inflight->bundle_.object();
00658
00659
00660 ASSERT(!inflight->sent_data_.empty());
00661
00662 size_t bytes_sent = inflight->sent_data_.last() + 1;
00663 size_t payload_sent = bytes_sent - inflight->header_block_length_;
00664 size_t payload_len = bundle->payload_.length();
00665
00666 if (payload_sent == payload_len) {
00667 log_debug("send_next_segment: "
00668 "already sent all %zu bytes of payload, finishing bundle",
00669 payload_len);
00670 return finish_bundle(inflight);
00671 }
00672
00673 size_t segment_len = std::min((size_t)params->segment_length_,
00674 payload_len - payload_sent);
00675
00676 ASSERT(payload_sent + segment_len <= payload_len);
00677
00678 size_t sdnv_len = SDNV::encoding_len(segment_len);
00679
00680 if (sendbuf_.tailbytes() < 1 + sdnv_len) {
00681 log_debug("send_next_segment: "
00682 "not enough space for segment header [need %zu, have %zu]",
00683 1 + sdnv_len, sendbuf_.tailbytes());
00684 return false;
00685 }
00686
00687 log_debug("send_next_segment: "
00688 "starting %zu byte segment [payload range %zu..%zu]",
00689 segment_len, payload_sent, payload_sent + segment_len);
00690
00691 u_char* bp = (u_char*)sendbuf_.end();
00692 *bp++ = DATA_SEGMENT;
00693 int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
00694 ASSERT(cc == (int)sdnv_len);
00695 bp += sdnv_len;
00696
00697 sendbuf_.fill(1 + sdnv_len);
00698 send_segment_todo_ = segment_len;
00699
00700
00701 return send_data_todo(inflight);
00702 }
00703
00704
00705 bool
00706 StreamConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
00707 {
00708 ASSERT(send_segment_todo_ != 0);
00709
00710 while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
00711 u_char* bp = (u_char*)sendbuf_.end();
00712
00713 size_t bytes_sent = inflight->sent_data_.last() + 1;
00714 size_t payload_offset = bytes_sent - inflight->header_block_length_;
00715 size_t send_len = std::min(send_segment_todo_, sendbuf_.tailbytes());
00716
00717 Bundle* bundle = inflight->bundle_.object();
00718 bundle->payload_.read_data(payload_offset, send_len, bp,
00719 BundlePayload::FORCE_COPY);
00720 sendbuf_.fill(send_len);
00721
00722 inflight->sent_data_.set(bytes_sent, send_len);
00723
00724 log_debug("send_data_todo: "
00725 "sent %zu/%zu of current segment from payload offset %zu "
00726 "(%zu todo), updated sent_data *%p",
00727 send_len, send_segment_todo_, payload_offset,
00728 send_segment_todo_ - send_len, &inflight->sent_data_);
00729
00730 send_segment_todo_ -= send_len;
00731
00732 note_data_sent();
00733 send_data();
00734
00735 if (contact_broken_)
00736 return true;
00737 }
00738
00739 return (send_segment_todo_ == 0);
00740 }
00741
00742
00743 bool
00744 StreamConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
00745 {
00746 if (sendbuf_.tailbytes() == 0) {
00747 log_warn("finish_bundle: rare out of space condition... "
00748 "making room for one byte");
00749 sendbuf_.reserve(1);
00750 ASSERT(sendbuf_.tailbytes() > 0);
00751 }
00752
00753 log_debug("finish_bundle: sending END_BUNDLE message");
00754
00755 *sendbuf_.end() = END_BUNDLE;
00756 sendbuf_.fill(1);
00757
00758 note_data_sent();
00759 send_data();
00760
00761 ASSERT(! inflight->bundle_->payload_.is_file_open());
00762
00763 ASSERT(current_inflight_ == inflight);
00764 current_inflight_ = NULL;
00765
00766 check_completed(inflight);
00767
00768 return true;
00769 }
00770
00771
00772 void
00773 StreamConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
00774 {
00775
00776
00777
00778
00779
00780
00781 if (current_inflight_ == inflight) {
00782 log_debug("check_completed: bundle %d still waiting for finish_bundle",
00783 inflight->bundle_->bundleid_);
00784 return;
00785 }
00786
00787 size_t acked_len = inflight->ack_data_.num_contiguous();
00788 if (acked_len < inflight->formatted_length_) {
00789 log_debug("check_completed: bundle %d only acked %zu/%zu",
00790 inflight->bundle_->bundleid_,
00791 acked_len, inflight->formatted_length_);
00792 return;
00793 }
00794
00795 log_debug("check_completed: bundle %d transmission complete",
00796 inflight->bundle_->bundleid_);
00797 ASSERT(inflight == inflight_.front());
00798 inflight_.pop_front();
00799 delete inflight;
00800 }
00801
00802
00803 void
00804 StreamConvergenceLayer::Connection::send_keepalive()
00805 {
00806
00807
00808
00809
00810 if (sendbuf_.fullbytes() != 0) {
00811 log_debug("send_keepalive: "
00812 "send buffer has %zu bytes queued, suppressing keepalive",
00813 sendbuf_.fullbytes());
00814 return;
00815 }
00816 ASSERT(sendbuf_.tailbytes() > 0);
00817
00818 ::gettimeofday(&keepalive_sent_, 0);
00819
00820 *(sendbuf_.end()) = KEEPALIVE;
00821 sendbuf_.fill(1);
00822
00823
00824
00825 send_data();
00826 }
00827
00828 void
00829 StreamConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
00830 {
00831 (void)bundle;
00832 }
00833
00834
00835 void
00836 StreamConvergenceLayer::Connection::handle_poll_timeout()
00837 {
00838 struct timeval now;
00839 u_int elapsed, elapsed2;
00840
00841 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00842 ASSERT(params != NULL);
00843
00844 ::gettimeofday(&now, 0);
00845
00846
00847
00848 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00849 if (elapsed > params->data_timeout_) {
00850 log_info("handle_poll_timeout: no data heard for %d msecs "
00851 "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u) "
00852 "-- closing contact",
00853 elapsed,
00854 (u_int)keepalive_sent_.tv_sec,
00855 (u_int)keepalive_sent_.tv_usec,
00856 (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
00857 (u_int)now.tv_sec, (u_int)now.tv_usec);
00858
00859 break_contact(ContactEvent::BROKEN);
00860 return;
00861 }
00862
00863
00864
00865 if (contact_->link()->type() == Link::ONDEMAND) {
00866 u_int idle_close_time = contact_->link()->params().idle_close_time_;
00867
00868 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00869 elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
00870
00871 if (idle_close_time != 0 &&
00872 (elapsed > idle_close_time * 1000) &&
00873 (elapsed2 > idle_close_time * 1000))
00874 {
00875 log_info("closing idle connection "
00876 "(no data received for %d msecs or sent for %d msecs)",
00877 elapsed, elapsed2);
00878 break_contact(ContactEvent::IDLE);
00879 return;
00880 } else {
00881 log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
00882 elapsed, elapsed2, idle_close_time * 1000);
00883 }
00884 }
00885
00886
00887
00888
00889 if (params->keepalive_interval_ != 0) {
00890 elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_);
00891 elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
00892
00893 if (std::min(elapsed, elapsed2) > (params->keepalive_interval_ * 1000))
00894 {
00895 log_debug("handle_poll_timeout: sending keepalive");
00896 send_keepalive();
00897 }
00898 }
00899 }
00900
00901
00902 void
00903 StreamConvergenceLayer::Connection::process_data()
00904 {
00905 if (recvbuf_.fullbytes() == 0) {
00906 return;
00907 }
00908
00909 log_debug("processing up to %zu bytes from receive buffer",
00910 recvbuf_.fullbytes());
00911
00912
00913
00914 note_data_rcvd();
00915
00916
00917
00918
00919 if (! contact_up_) {
00920 handle_contact_initiation();
00921 return;
00922 }
00923
00924
00925
00926
00927
00928 if (recv_segment_todo_ != 0) {
00929 bool ok = handle_data_todo();
00930
00931 if (!ok) {
00932 return;
00933 }
00934 }
00935
00936
00937
00938
00939
00940
00941 while (recvbuf_.fullbytes() != 0) {
00942 if (contact_broken_) return;
00943
00944 char type = *recvbuf_.start();
00945
00946 log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
00947 recvbuf_.fullbytes());
00948 bool ok;
00949 switch (type) {
00950 case START_BUNDLE:
00951 ok = handle_start_bundle();
00952 break;
00953 case END_BUNDLE:
00954 ok = handle_end_bundle();
00955 break;
00956 case DATA_SEGMENT:
00957 ok = handle_data_segment();
00958 break;
00959 case ACK_SEGMENT:
00960 ok = handle_ack_segment();
00961 break;
00962 case KEEPALIVE:
00963 ok = handle_keepalive();
00964 break;
00965 case SHUTDOWN:
00966 ok = handle_shutdown();
00967 break;
00968 default:
00969 log_err("invalid CL message type 0x%x", type);
00970 break_contact(ContactEvent::BROKEN);
00971 return;
00972 }
00973
00974
00975
00976 if (! ok) {
00977 if (recvbuf_.fullbytes() == recvbuf_.size()) {
00978 log_warn("process_data: "
00979 "%zu byte recv buffer full but too small for msg %u... "
00980 "doubling buffer size",
00981 recvbuf_.size(), type);
00982
00983 recvbuf_.reserve(recvbuf_.size() * 2);
00984
00985 } else if (recvbuf_.tailbytes() == 0) {
00986
00987 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
00988 ASSERT(recvbuf_.tailbytes() != 0);
00989 }
00990
00991 return;
00992 }
00993 }
00994 }
00995
00996
00997 void
00998 StreamConvergenceLayer::Connection::note_data_rcvd()
00999 {
01000 log_debug("noting data_rcvd");
01001 ::gettimeofday(&data_rcvd_, 0);
01002 }
01003
01004
01005 void
01006 StreamConvergenceLayer::Connection::note_data_sent()
01007 {
01008 log_debug("noting data_sent");
01009 ::gettimeofday(&data_sent_, 0);
01010 }
01011
01012
01013 bool
01014 StreamConvergenceLayer::Connection::handle_start_bundle()
01015 {
01016 recvbuf_.consume(1);
01017
01018 if (! incoming_.empty()) {
01019 IncomingBundle* incoming = incoming_.back();
01020 if (incoming->total_length_ == 0) {
01021 log_err("protocol error: got START_BUNDLE before bundle completed");
01022 break_contact(ContactEvent::BROKEN);
01023 return false;
01024 }
01025 }
01026
01027 log_debug("got START_BUNDLE message, creating new IncomingBundle");
01028 IncomingBundle* incoming = new IncomingBundle(NULL);
01029 incoming_.push_back(incoming);
01030
01031 return true;
01032 }
01033
01034
01035 bool
01036 StreamConvergenceLayer::Connection::handle_end_bundle()
01037 {
01038 recvbuf_.consume(1);
01039
01040 if (incoming_.empty()) {
01041 log_err("protocol error: got END_BUNDLE with no incoming bundle");
01042
01043 break_contact(ContactEvent::BROKEN);
01044 return false;
01045 }
01046
01047 IncomingBundle* incoming = incoming_.back();
01048
01049 if (incoming->rcvd_data_.empty()) {
01050 log_err("protocol error: got END_BUNDLE with no DATA_SEGMENT");
01051
01052
01053 break_contact(ContactEvent::BROKEN);
01054 return false;
01055 }
01056
01057
01058
01059
01060
01061
01062
01063 incoming->total_length_ = incoming->rcvd_data_.last() + 1;
01064
01065 size_t formatted_len =
01066 BundleProtocol::formatted_length(incoming->bundle_.object());
01067
01068 log_debug("got END_BUNDLE: rcvd %zu / %zu",
01069 incoming->total_length_, formatted_len);
01070
01071 if (incoming->total_length_ > formatted_len) {
01072 log_err("protocol error: received too much data -- "
01073 "got %zu, formatted length %zu",
01074 incoming->total_length_, formatted_len);
01075
01076
01077
01078
01079 incoming->rcvd_data_.clear();
01080
01081
01082 break_contact(ContactEvent::BROKEN);
01083 return false;
01084 }
01085
01086
01087
01088 size_t payload_rcvd = incoming->total_length_ -
01089 incoming->header_block_length_;
01090
01091 incoming->bundle_->payload_.close_file();
01092
01093 BundleDaemon::post(
01094 new BundleReceivedEvent(incoming->bundle_.object(),
01095 EVENTSRC_PEER,
01096 payload_rcvd));
01097
01098 return true;
01099 }
01100
01101
01102 bool
01103 StreamConvergenceLayer::Connection::handle_data_segment()
01104 {
01105 if (incoming_.empty()) {
01106 log_err("protocol error: got data segment before START_BUNDLE");
01107 break_contact(ContactEvent::BROKEN);
01108 return false;
01109 }
01110
01111 u_char* bp = (u_char*)recvbuf_.start();
01112
01113 size_t segment_offset;
01114 u_int32_t segment_len;
01115
01116 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &segment_len);
01117
01118 if (sdnv_len < 0) {
01119 log_debug("handle_data_segment: too few bytes for sdnv (%zu)",
01120 recvbuf_.fullbytes());
01121 return false;
01122 }
01123
01124
01125
01126
01127 IncomingBundle* incoming = incoming_.back();
01128
01129
01130
01131
01132
01133 if (incoming->rcvd_data_.empty()) {
01134 Bundle* bundle = new Bundle();
01135
01136 size_t rcvd_bytes = recvbuf_.fullbytes() - 1 - sdnv_len;
01137 int header_len = BundleProtocol::parse_header_blocks(bundle,
01138 bp + 1 + sdnv_len,
01139 rcvd_bytes);
01140 if (header_len < 0) {
01141 log_debug("handle_data_segment: "
01142 "not enough data to parse bundle headers [have %zu]",
01143 recvbuf_.fullbytes());
01144 delete bundle;
01145 return false;
01146 }
01147
01148 log_debug("handle_data_segment: "
01149 "got header segments [len %u] for bundle *%p",
01150 header_len, bundle);
01151
01152 incoming->bundle_ = bundle;
01153 incoming->header_block_length_ = header_len;
01154 recvbuf_.consume(1 + sdnv_len + header_len);
01155 incoming->rcvd_data_.set(0, header_len);
01156
01157 segment_offset = 0;
01158 recv_segment_todo_ = segment_len - header_len;
01159
01160 } else {
01161
01162 segment_offset = incoming->rcvd_data_.num_contiguous();
01163 size_t payload_offset = segment_offset - incoming->header_block_length_;
01164 (void)payload_offset;
01165
01166 log_debug("handle_data_segment: "
01167 "got segment of length %u at offset %zu "
01168 "(payload offset %zu/%zu)",
01169 segment_len, segment_offset, payload_offset,
01170 incoming->bundle_->payload_.length());
01171
01172 if (segment_offset != incoming->ack_data_.last() + 1) {
01173 log_err("XXX/demmer segment offset %zu last ack %zu",
01174 segment_offset, incoming->ack_data_.last());
01175 }
01176
01177 recvbuf_.consume(1 + sdnv_len);
01178
01179 recv_segment_todo_ = segment_len;
01180 }
01181
01182 incoming->ack_data_.set(segment_offset);
01183 incoming->ack_data_.set(segment_offset + segment_len - 1);
01184
01185 log_debug("handle_data_segment: "
01186 "updated recv_data (segment_offset %zu) *%p ack_data *%p",
01187 segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01188
01189 if (recv_segment_todo_ != 0) {
01190 handle_data_todo();
01191 }
01192
01193 return true;
01194 }
01195
01196
01197 bool
01198 StreamConvergenceLayer::Connection::handle_data_todo()
01199 {
01200
01201
01202 ASSERT(!incoming_.empty());
01203 ASSERT(recv_segment_todo_ != 0);
01204
01205
01206
01207
01208 IncomingBundle* incoming = incoming_.back();
01209
01210 size_t rcvd_offset = incoming->rcvd_data_.num_contiguous();
01211 size_t rcvd_len = recvbuf_.fullbytes();
01212 size_t payload_offset = rcvd_offset - incoming->header_block_length_;
01213 size_t payload_len = incoming->bundle_->payload_.length();
01214 size_t chunk_len = std::min(rcvd_len, recv_segment_todo_);
01215
01216 if (rcvd_len == 0) {
01217 return false;
01218 }
01219
01220 log_debug("handle_data_todo: "
01221 "reading todo segment %zu/%zu at payload offset %zu/%zu",
01222 chunk_len, recv_segment_todo_, payload_offset, payload_len);
01223
01224 if (chunk_len + payload_offset > payload_len) {
01225 log_err("NEED TO HANDLE BLOCKS AFTER THE PAYLOAD");
01226 break_contact(ContactEvent::BROKEN);
01227 return false;
01228 }
01229
01230 u_char* bp = (u_char*)recvbuf_.start();
01231 incoming->bundle_->payload_.write_data(bp, payload_offset, chunk_len);
01232
01233 recv_segment_todo_ -= chunk_len;
01234 recvbuf_.consume(chunk_len);
01235
01236 incoming->rcvd_data_.set(rcvd_offset, chunk_len);
01237
01238 log_debug("handle_data_todo: "
01239 "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
01240 rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01241
01242 if (recv_segment_todo_ == 0) {
01243 return true;
01244 }
01245
01246 return false;
01247 }
01248
01249
01250 bool
01251 StreamConvergenceLayer::Connection::handle_ack_segment()
01252 {
01253 u_char* bp = (u_char*)recvbuf_.start();
01254 u_int32_t acked_len;
01255 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
01256
01257 if (sdnv_len < 0) {
01258 log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
01259 recvbuf_.fullbytes());
01260 return false;
01261 }
01262
01263 recvbuf_.consume(1 + sdnv_len);
01264
01265 if (inflight_.empty()) {
01266 log_err("protocol error: got ack segment with no inflight bundle");
01267 break_contact(ContactEvent::BROKEN);
01268 return false;
01269 }
01270
01271 InFlightBundle* inflight = inflight_.front();
01272
01273 size_t ack_begin;
01274 DataBitmap::iterator i = inflight->ack_data_.begin();
01275 if (i == inflight->ack_data_.end()) {
01276 ack_begin = 0;
01277 } else {
01278 i.skip_contiguous();
01279 ack_begin = *i + 1;
01280 }
01281
01282 if (acked_len < ack_begin) {
01283 log_err("protocol error: got ack for length %u but already acked up to %zu",
01284 acked_len, ack_begin);
01285 break_contact(ContactEvent::BROKEN);
01286 return false;
01287 }
01288
01289 inflight->ack_data_.set(0, acked_len);
01290
01291
01292
01293
01294 if (acked_len == inflight->formatted_length_) {
01295 log_debug("handle_ack_segment: "
01296 "got final ack for %zu byte range -- acked_len %u, ack_data *%p",
01297 (size_t)acked_len - ack_begin, acked_len, &inflight->ack_data_);
01298
01299
01300
01301 BundleDaemon::post(
01302 new BundleTransmittedEvent(inflight->bundle_.object(),
01303 contact_,
01304 inflight->sent_data_.num_contiguous() -
01305 inflight->header_block_length_,
01306 inflight->ack_data_.num_contiguous() -
01307 inflight->header_block_length_));
01308
01309
01310 check_completed(inflight);
01311
01312 } else {
01313 log_debug("handle_ack_segment: "
01314 "got acked_len %u (%zu byte range) -- ack_data *%p",
01315 acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
01316 }
01317
01318 return true;
01319 }
01320
01321
01322 bool
01323 StreamConvergenceLayer::Connection::handle_keepalive()
01324 {
01325 log_debug("got keepalive message");
01326 recvbuf_.consume(1);
01327 return true;
01328 }
01329
01330
01331 bool
01332 StreamConvergenceLayer::Connection::handle_shutdown()
01333 {
01334 log_debug("got SHUTDOWN byte");
01335 recvbuf_.consume(1);
01336 break_contact(ContactEvent::SHUTDOWN);
01337
01338 return false;
01339 }
01340
01341 }