00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <oasys/util/OptParser.h>
00019 #include "StreamConvergenceLayer.h"
00020 #include "bundling/AnnounceBundle.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "bundling/SDNV.h"
00023 #include "bundling/TempBundle.h"
00024 #include "contacts/ContactManager.h"
00025
00026 namespace dtn {
00027
00028
00029 StreamConvergenceLayer::StreamLinkParams::StreamLinkParams(bool init_defaults)
00030 : LinkParams(init_defaults),
00031 segment_ack_enabled_(true),
00032 negative_ack_enabled_(true),
00033 keepalive_interval_(10),
00034 segment_length_(4096)
00035 {
00036 }
00037
00038
00039 StreamConvergenceLayer::StreamConvergenceLayer(const char* logpath,
00040 const char* cl_name,
00041 u_int8_t cl_version)
00042 : ConnectionConvergenceLayer(logpath, cl_name),
00043 cl_version_(cl_version)
00044 {
00045 }
00046
00047
00048 bool
00049 StreamConvergenceLayer::parse_link_params(LinkParams* lparams,
00050 int argc, const char** argv,
00051 const char** invalidp)
00052 {
00053
00054
00055 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00056 ASSERT(params != NULL);
00057
00058 oasys::OptParser p;
00059
00060 p.addopt(new oasys::BoolOpt("segment_ack_enabled",
00061 ¶ms->segment_ack_enabled_));
00062
00063 p.addopt(new oasys::BoolOpt("negative_ack_enabled",
00064 ¶ms->negative_ack_enabled_));
00065
00066 p.addopt(new oasys::UIntOpt("keepalive_interval",
00067 ¶ms->keepalive_interval_));
00068
00069 p.addopt(new oasys::UIntOpt("segment_length",
00070 ¶ms->segment_length_));
00071
00072 p.addopt(new oasys::UInt8Opt("cl_version",
00073 &cl_version_));
00074
00075 int count = p.parse_and_shift(argc, argv, invalidp);
00076 if (count == -1) {
00077 return false;
00078 }
00079 argc -= count;
00080
00081 return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
00082 invalidp);
00083 }
00084
00085
00086 bool
00087 StreamConvergenceLayer::finish_init_link(Link* link, LinkParams* lparams)
00088 {
00089 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00090 ASSERT(params != NULL);
00091
00092
00093 if (params->segment_ack_enabled_) {
00094 link->set_reliable(true);
00095 }
00096
00097 return true;
00098 }
00099
00100
00101 void
00102 StreamConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00103 {
00104 ConnectionConvergenceLayer::dump_link(link, buf);
00105
00106 StreamLinkParams* params =
00107 dynamic_cast<StreamLinkParams*>(link->cl_info());
00108 ASSERT(params != NULL);
00109
00110 buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
00111 buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_);
00112 buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
00113 buf->appendf("segment_length: %u\n", params->segment_length_);
00114 }
00115
00116
00117 StreamConvergenceLayer::Connection::Connection(const char* classname,
00118 const char* logpath,
00119 StreamConvergenceLayer* cl,
00120 StreamLinkParams* params,
00121 bool active_connector)
00122 : CLConnection(classname, logpath, cl, params, active_connector),
00123 current_inflight_(NULL),
00124 send_segment_todo_(0),
00125 recv_segment_todo_(0),
00126 breaking_contact_(false)
00127 {
00128 }
00129
00130
00131 void
00132 StreamConvergenceLayer::Connection::initiate_contact()
00133 {
00134 log_debug("initiate_contact called");
00135
00136
00137 ContactHeader contacthdr;
00138 contacthdr.magic = htonl(MAGIC);
00139 contacthdr.version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00140
00141 contacthdr.flags = 0;
00142
00143 StreamLinkParams* params = stream_lparams();
00144
00145 if (params->segment_ack_enabled_)
00146 contacthdr.flags |= SEGMENT_ACK_ENABLED;
00147
00148 if (params->reactive_frag_enabled_)
00149 contacthdr.flags |= REACTIVE_FRAG_ENABLED;
00150
00151 contacthdr.keepalive_interval = htons(params->keepalive_interval_);
00152
00153
00154 ASSERT(sendbuf_.fullbytes() == 0);
00155 if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
00156 log_warn("send buffer too short: %zu < needed %zu",
00157 sendbuf_.tailbytes(), sizeof(ContactHeader));
00158 sendbuf_.reserve(sizeof(ContactHeader));
00159 }
00160
00161 memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
00162 sendbuf_.fill(sizeof(ContactHeader));
00163
00164
00165 BundleDaemon* bd = BundleDaemon::instance();
00166 size_t local_eid_len = bd->local_eid().length();
00167 size_t sdnv_len = SDNV::encoding_len(local_eid_len);
00168
00169 if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) {
00170 log_warn("send buffer too short: %zu < needed %zu",
00171 sendbuf_.tailbytes(), sdnv_len + local_eid_len);
00172 sendbuf_.reserve(sdnv_len + local_eid_len);
00173 }
00174
00175 sdnv_len = SDNV::encode(local_eid_len,
00176 (u_char*)sendbuf_.end(),
00177 sendbuf_.tailbytes());
00178 sendbuf_.fill(sdnv_len);
00179
00180 memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len);
00181 sendbuf_.fill(local_eid_len);
00182
00183
00184 note_data_sent();
00185 send_data();
00186
00187
00188
00189
00190
00191
00192 ::gettimeofday(&data_rcvd_, 0);
00193 ::gettimeofday(&data_sent_, 0);
00194 ::gettimeofday(&keepalive_sent_, 0);
00195
00196
00197
00198 }
00199
00200
00201 void
00202 StreamConvergenceLayer::Connection::handle_contact_initiation()
00203 {
00204 ASSERT(! contact_up_);
00205
00206
00207
00208
00209 size_t len_needed = sizeof(ContactHeader);
00210 if (recvbuf_.fullbytes() < len_needed) {
00211 tooshort:
00212 log_debug("handle_contact_initiation: not enough data received "
00213 "(need > %zu, got %zu)",
00214 len_needed, recvbuf_.fullbytes());
00215 return;
00216 }
00217
00218
00219
00220
00221 u_int64_t peer_eid_len;
00222 int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
00223 sizeof(ContactHeader),
00224 recvbuf_.fullbytes() -
00225 sizeof(ContactHeader),
00226 &peer_eid_len);
00227 if (sdnv_len < 0) {
00228 goto tooshort;
00229 }
00230
00231 len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len;
00232 if (recvbuf_.fullbytes() < len_needed) {
00233 goto tooshort;
00234 }
00235
00236
00237
00238
00239 ContactHeader contacthdr;
00240 memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
00241
00242 contacthdr.magic = ntohl(contacthdr.magic);
00243 contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
00244
00245 recvbuf_.consume(sizeof(ContactHeader));
00246
00247
00248
00249
00250 if (contacthdr.magic != MAGIC) {
00251 log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
00252 "-- disconnecting.", contacthdr.magic, MAGIC);
00253 break_contact(ContactEvent::CL_ERROR);
00254 return;
00255 }
00256
00257
00258
00259
00260
00261
00262
00263 u_int8_t cl_version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00264 if (contacthdr.version < cl_version) {
00265 log_warn("remote sent version %d, expected version %d "
00266 "-- disconnecting.", contacthdr.version, cl_version);
00267 break_contact(ContactEvent::CL_VERSION);
00268 return;
00269 }
00270
00271
00272
00273
00274 StreamLinkParams* params = stream_lparams();
00275
00276 params->keepalive_interval_ =
00277 std::min(params->keepalive_interval_,
00278 (u_int)contacthdr.keepalive_interval);
00279
00280 params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
00281 (contacthdr.flags & SEGMENT_ACK_ENABLED);
00282
00283 params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
00284 (contacthdr.flags & REACTIVE_FRAG_ENABLED);
00285
00286 params->negative_ack_enabled_ = params->negative_ack_enabled_ &&
00287 (contacthdr.flags & NEGATIVE_ACK_ENABLED);
00288
00289
00290
00291
00292
00293 if (params->keepalive_interval_ != 0 &&
00294 (params->keepalive_interval_ * 1000) < params->data_timeout_)
00295 {
00296 poll_timeout_ = params->keepalive_interval_ * 1000;
00297 }
00298
00299
00300
00301
00302
00303 recvbuf_.consume(sdnv_len);
00304
00305
00306
00307
00308
00309
00310 EndpointID peer_eid;
00311 if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) {
00312 log_err("protocol error: invalid endpoint id '%s' (len %llu)",
00313 peer_eid.c_str(), U64FMT(peer_eid_len));
00314 break_contact(ContactEvent::CL_ERROR);
00315 return;
00316 }
00317
00318 find_contact(peer_eid);
00319 recvbuf_.consume(peer_eid_len);
00320
00321
00322
00323
00324 contact_up();
00325 }
00326
00327
00328
00329 void
00330 StreamConvergenceLayer::Connection::handle_send_bundle(Bundle* bundle)
00331 {
00332
00333
00334 InFlightBundle* inflight = new InFlightBundle(bundle);
00335 inflight->blocks_ = bundle->xmit_blocks_.find_blocks(contact_->link());
00336 ASSERT(inflight->blocks_ != NULL);
00337 inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_);
00338 inflight_.push_back(inflight);
00339 }
00340
00341
00342 bool
00343 StreamConvergenceLayer::Connection::send_pending_data()
00344 {
00345
00346
00347 if (sendbuf_.tailbytes() == 0) {
00348 return false;
00349 }
00350
00351
00352
00353
00354
00355 if (send_segment_todo_ != 0) {
00356 ASSERT(current_inflight_ != NULL);
00357 send_data_todo(current_inflight_);
00358 }
00359
00360
00361 if (contact_broken_ || (send_segment_todo_ != 0)) {
00362 return false;
00363 }
00364
00365
00366
00367
00368
00369 bool sent_ack = send_pending_acks();
00370
00371
00372 if (contact_broken_)
00373 {
00374 return sent_ack;
00375 }
00376
00377
00378
00379 bool sent_data;
00380 if (current_inflight_ == NULL) {
00381 sent_data = start_next_bundle();
00382 } else {
00383
00384 sent_data = send_next_segment(current_inflight_);
00385 }
00386
00387 return sent_ack || sent_data;
00388 }
00389
00390
00391 bool
00392 StreamConvergenceLayer::Connection::send_pending_acks()
00393 {
00394 if (contact_broken_ || incoming_.empty()) {
00395 return false;
00396 }
00397 IncomingBundle* incoming = incoming_.front();
00398 DataBitmap::iterator iter = incoming->ack_data_.begin();
00399 bool generated_ack = false;
00400
00401
00402
00403
00404 if (iter == incoming->ack_data_.end()) {
00405 goto check_done;
00406 }
00407
00408
00409
00410
00411
00412 while (1) {
00413 size_t rcvd_bytes = incoming->rcvd_data_.num_contiguous();
00414 size_t ack_len = *iter + 1;
00415 size_t segment_len = ack_len - incoming->acked_length_;
00416 (void)segment_len;
00417
00418 if (ack_len > rcvd_bytes) {
00419 log_debug("send_pending_acks: "
00420 "waiting to send ack length %zu for %zu byte segment "
00421 "since only received %zu",
00422 ack_len, segment_len, rcvd_bytes);
00423 break;
00424 }
00425
00426
00427 size_t encoding_len = 1 + SDNV::encoding_len(ack_len);
00428 if (encoding_len > sendbuf_.tailbytes()) {
00429 log_debug("send_pending_acks: "
00430 "no space for ack in buffer (need %zu, have %zu)",
00431 encoding_len, sendbuf_.tailbytes());
00432 break;
00433 }
00434
00435 log_debug("send_pending_acks: "
00436 "sending ack length %zu for %zu byte segment "
00437 "[range %u..%u] ack_data *%p",
00438 ack_len, segment_len, incoming->acked_length_, *iter,
00439 &incoming->ack_data_);
00440
00441 *sendbuf_.end() = ACK_SEGMENT;
00442 int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
00443 sendbuf_.tailbytes() - 1);
00444 ASSERT(encoding_len = len + 1);
00445 sendbuf_.fill(encoding_len);
00446
00447 generated_ack = true;
00448 incoming->acked_length_ = ack_len;
00449 incoming->ack_data_.clear(*iter);
00450 iter = incoming->ack_data_.begin();
00451
00452 if (iter == incoming->ack_data_.end()) {
00453
00454
00455 break;
00456 }
00457
00458 log_debug("send_pending_acks: "
00459 "found another segment (%u)", *iter);
00460 }
00461
00462 if (generated_ack) {
00463 send_data();
00464 note_data_sent();
00465 }
00466
00467
00468
00469
00470 check_done:
00471 if ((incoming->total_length_ != 0) &&
00472 (incoming->total_length_ == incoming->acked_length_))
00473 {
00474 log_debug("send_pending_acks: acked all %u bytes of bundle %d",
00475 incoming->total_length_, incoming->bundle_->bundleid_);
00476
00477 incoming_.pop_front();
00478 delete incoming;
00479 }
00480 else
00481 {
00482 log_debug("send_pending_acks: "
00483 "still need to send acks -- acked_range %u",
00484 incoming->ack_data_.num_contiguous());
00485 }
00486
00487
00488 return generated_ack;
00489 }
00490
00491
00492
00493 bool
00494 StreamConvergenceLayer::Connection::start_next_bundle()
00495 {
00496 ASSERT(current_inflight_ == NULL);
00497
00498
00499
00500 InFlightList::iterator iter;
00501 for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
00502 InFlightBundle* inflight = *iter;
00503
00504 if (contact_broken_)
00505 return false;
00506
00507
00508 if (inflight->send_complete_)
00509 {
00510 ASSERT(inflight->sent_data_.num_contiguous() ==
00511 inflight->total_length_);
00512
00513 log_debug("start_next_bundle: "
00514 "transmission of bundle %d already complete, skipping",
00515 inflight->bundle_->bundleid_);
00516 continue;
00517 }
00518
00519
00520
00521 ASSERT(inflight->sent_data_.empty());
00522 current_inflight_ = inflight;
00523 break;
00524 }
00525
00526
00527
00528 if (current_inflight_ == NULL) {
00529 return false;
00530 }
00531
00532
00533 return send_next_segment(current_inflight_);
00534 }
00535
00536
00537 bool
00538 StreamConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
00539 {
00540 if (sendbuf_.tailbytes() == 0) {
00541 return false;
00542 }
00543
00544 ASSERT(send_segment_todo_ == 0);
00545
00546 StreamLinkParams* params = stream_lparams();
00547
00548 size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
00549 inflight->sent_data_.last() + 1;
00550
00551 if (bytes_sent == inflight->total_length_) {
00552 log_debug("send_next_segment: "
00553 "already sent all %zu bytes, finishing bundle",
00554 bytes_sent);
00555 ASSERT(inflight->send_complete_);
00556 return finish_bundle(inflight);
00557 }
00558
00559 u_int8_t flags = 0;
00560 size_t segment_len;
00561
00562 if (bytes_sent == 0) {
00563 flags |= BUNDLE_START;
00564 }
00565
00566 if (params->segment_length_ >= inflight->total_length_ - bytes_sent) {
00567 flags |= BUNDLE_END;
00568 segment_len = inflight->total_length_ - bytes_sent;
00569 } else {
00570 segment_len = params->segment_length_;
00571 }
00572
00573 size_t sdnv_len = SDNV::encoding_len(segment_len);
00574
00575 if (sendbuf_.tailbytes() < 1 + sdnv_len) {
00576 log_debug("send_next_segment: "
00577 "not enough space for segment header [need %zu, have %zu]",
00578 1 + sdnv_len, sendbuf_.tailbytes());
00579 return false;
00580 }
00581
00582 log_debug("send_next_segment: "
00583 "starting %zu byte segment [block byte range %zu..%zu]",
00584 segment_len, bytes_sent, bytes_sent + segment_len);
00585
00586 u_char* bp = (u_char*)sendbuf_.end();
00587 *bp++ = DATA_SEGMENT | flags;
00588 int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
00589 ASSERT(cc == (int)sdnv_len);
00590 bp += sdnv_len;
00591
00592 sendbuf_.fill(1 + sdnv_len);
00593 send_segment_todo_ = segment_len;
00594
00595
00596 return send_data_todo(inflight);
00597 }
00598
00599
00600 bool
00601 StreamConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
00602 {
00603 ASSERT(send_segment_todo_ != 0);
00604
00605
00606
00607 while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
00608 size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
00609 inflight->sent_data_.last() + 1;
00610 size_t send_len = std::min(send_segment_todo_, sendbuf_.tailbytes());
00611
00612 Bundle* bundle = inflight->bundle_.object();
00613 BlockInfoVec* blocks = inflight->blocks_;
00614
00615 size_t ret =
00616 BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(),
00617 bytes_sent, send_len,
00618 &inflight->send_complete_);
00619 ASSERT(ret == send_len);
00620 sendbuf_.fill(send_len);
00621 inflight->sent_data_.set(bytes_sent, send_len);
00622
00623 log_debug("send_data_todo: "
00624 "sent %zu/%zu of current segment from block offset %zu "
00625 "(%zu todo), updated sent_data *%p",
00626 send_len, send_segment_todo_, bytes_sent,
00627 send_segment_todo_ - send_len, &inflight->sent_data_);
00628
00629 send_segment_todo_ -= send_len;
00630
00631 note_data_sent();
00632 send_data();
00633
00634 if (contact_broken_)
00635 return true;
00636 }
00637
00638 return (send_segment_todo_ == 0);
00639 }
00640
00641
00642 bool
00643 StreamConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
00644 {
00645 ASSERT(inflight->send_complete_);
00646
00647 ASSERT(current_inflight_ == inflight);
00648 current_inflight_ = NULL;
00649
00650 check_completed(inflight);
00651 check_unblock_link();
00652
00653 return true;
00654 }
00655
00656
00657 void
00658 StreamConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
00659 {
00660
00661
00662
00663
00664
00665
00666 if (current_inflight_ == inflight) {
00667 log_debug("check_completed: bundle %d still waiting for finish_bundle",
00668 inflight->bundle_->bundleid_);
00669 return;
00670 }
00671
00672 u_int32_t acked_len = inflight->ack_data_.num_contiguous();
00673 if (acked_len < inflight->total_length_) {
00674 log_debug("check_completed: bundle %d only acked %u/%u",
00675 inflight->bundle_->bundleid_,
00676 acked_len, inflight->total_length_);
00677 return;
00678 }
00679
00680 log_debug("check_completed: bundle %d transmission complete",
00681 inflight->bundle_->bundleid_);
00682 ASSERT(inflight == inflight_.front());
00683 inflight_.pop_front();
00684 delete inflight;
00685 }
00686
00687
00688 void
00689 StreamConvergenceLayer::Connection::send_keepalive()
00690 {
00691
00692
00693
00694
00695 if (sendbuf_.fullbytes() != 0) {
00696 log_debug("send_keepalive: "
00697 "send buffer has %zu bytes queued, suppressing keepalive",
00698 sendbuf_.fullbytes());
00699 return;
00700 }
00701 ASSERT(sendbuf_.tailbytes() > 0);
00702
00703 ::gettimeofday(&keepalive_sent_, 0);
00704
00705 *(sendbuf_.end()) = KEEPALIVE;
00706 sendbuf_.fill(1);
00707
00708
00709
00710 send_data();
00711 }
00712
00713 void
00714 StreamConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
00715 {
00716 (void)bundle;
00717 }
00718
00719
00720 void
00721 StreamConvergenceLayer::Connection::handle_poll_timeout()
00722 {
00723
00724
00725
00726
00727
00728
00729
00730 if (BundleDaemon::shutting_down())
00731 {
00732 sleep(1);
00733 return;
00734 }
00735
00736 struct timeval now;
00737 u_int elapsed, elapsed2;
00738
00739 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00740 ASSERT(params != NULL);
00741
00742 ::gettimeofday(&now, 0);
00743
00744
00745
00746 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00747 if (elapsed > params->data_timeout_) {
00748 log_info("handle_poll_timeout: no data heard for %d msecs "
00749 "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) "
00750 "-- closing contact",
00751 elapsed,
00752 (u_int)keepalive_sent_.tv_sec,
00753 (u_int)keepalive_sent_.tv_usec,
00754 (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
00755 (u_int)now.tv_sec, (u_int)now.tv_usec,
00756 poll_timeout_);
00757
00758 break_contact(ContactEvent::BROKEN);
00759 return;
00760 }
00761
00762
00763 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00764 oasys::ScopeLock l(cm->lock(),"StreamConvergenceLayer::Connection::handle_poll_timeout");
00765 if(contact_ == NULL)
00766 {
00767 return;
00768 }
00769
00770
00771
00772 if (contact_->link()->type() == Link::ONDEMAND) {
00773 u_int idle_close_time = contact_->link()->params().idle_close_time_;
00774
00775 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00776 elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
00777
00778 if (idle_close_time != 0 &&
00779 (elapsed > idle_close_time * 1000) &&
00780 (elapsed2 > idle_close_time * 1000))
00781 {
00782 log_info("closing idle connection "
00783 "(no data received for %d msecs or sent for %d msecs)",
00784 elapsed, elapsed2);
00785 break_contact(ContactEvent::IDLE);
00786 return;
00787 } else {
00788 log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
00789 elapsed, elapsed2, idle_close_time * 1000);
00790 }
00791 }
00792
00793
00794
00795
00796 check_keepalive();
00797 }
00798
00799
00800 void
00801 StreamConvergenceLayer::Connection::check_keepalive()
00802 {
00803 struct timeval now;
00804 u_int elapsed, elapsed2;
00805
00806 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00807 ASSERT(params != NULL);
00808
00809 ::gettimeofday(&now, 0);
00810
00811 if (params->keepalive_interval_ != 0) {
00812 elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_);
00813 elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
00814
00815
00816
00817
00818
00819
00820
00821 if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500))
00822 {
00823 send_keepalive();
00824 }
00825 }
00826
00827
00828
00829 if (contact_ != NULL &&
00830 contact_->link()->state() == Link::BUSY &&
00831 num_pending_.value == 0)
00832 {
00833 elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_);
00834 if (elapsed > 5000) {
00835 log_warn("0 bundles pending and %d msecs since last xmit, "
00836 "clearing BUSY state",
00837 elapsed);
00838
00839
00840 BundleDaemon::post_at_head(
00841 new LinkStateChangeRequest(contact_->link(),
00842 Link::AVAILABLE,
00843 ContactEvent::UNBLOCKED));
00844 }
00845 }
00846 }
00847
00848
00849 void
00850 StreamConvergenceLayer::Connection::process_data()
00851 {
00852 if (recvbuf_.fullbytes() == 0) {
00853 return;
00854 }
00855
00856 log_debug("processing up to %zu bytes from receive buffer",
00857 recvbuf_.fullbytes());
00858
00859
00860
00861
00862 note_data_rcvd();
00863
00864
00865
00866
00867 if (! contact_up_) {
00868 handle_contact_initiation();
00869 return;
00870 }
00871
00872
00873
00874
00875
00876 if (recv_segment_todo_ != 0) {
00877 bool ok = handle_data_todo();
00878
00879 if (!ok) {
00880 return;
00881 }
00882 }
00883
00884
00885
00886
00887
00888
00889 while (recvbuf_.fullbytes() != 0) {
00890 if (contact_broken_) return;
00891
00892 u_int8_t type = *recvbuf_.start() & 0xf0;
00893 u_int8_t flags = *recvbuf_.start() & 0x0f;
00894
00895 log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
00896 recvbuf_.fullbytes());
00897 bool ok;
00898 switch (type) {
00899 case DATA_SEGMENT:
00900 ok = handle_data_segment(flags);
00901 break;
00902 case ACK_SEGMENT:
00903 ok = handle_ack_segment(flags);
00904 break;
00905 case REFUSE_BUNDLE:
00906 ok = handle_refuse_bundle(flags);
00907 break;
00908 case KEEPALIVE:
00909 ok = handle_keepalive(flags);
00910 break;
00911 case SHUTDOWN:
00912 ok = handle_shutdown(flags);
00913 break;
00914 default:
00915 log_err("invalid CL message type code 0x%x (flags 0x%x)",
00916 type >> 4, flags);
00917 break_contact(ContactEvent::CL_ERROR);
00918 return;
00919 }
00920
00921
00922
00923 if (! ok) {
00924 if (recvbuf_.fullbytes() == recvbuf_.size()) {
00925 log_warn("process_data: "
00926 "%zu byte recv buffer full but too small for msg %u... "
00927 "doubling buffer size",
00928 recvbuf_.size(), type);
00929
00930 recvbuf_.reserve(recvbuf_.size() * 2);
00931
00932 } else if (recvbuf_.tailbytes() == 0) {
00933
00934 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
00935 ASSERT(recvbuf_.tailbytes() != 0);
00936 }
00937
00938 return;
00939 }
00940 }
00941 }
00942
00943
00944 void
00945 StreamConvergenceLayer::Connection::note_data_rcvd()
00946 {
00947 log_debug("noting data_rcvd");
00948 ::gettimeofday(&data_rcvd_, 0);
00949 }
00950
00951
00952 void
00953 StreamConvergenceLayer::Connection::note_data_sent()
00954 {
00955 log_debug("noting data_sent");
00956 ::gettimeofday(&data_sent_, 0);
00957 }
00958
00959
00960 bool
00961 StreamConvergenceLayer::Connection::handle_data_segment(u_int8_t flags)
00962 {
00963 if (flags & BUNDLE_START)
00964 {
00965
00966
00967 if (!incoming_.empty())
00968 {
00969 IncomingBundle* incoming = incoming_.back();
00970 if (incoming->total_length_ == 0) {
00971 log_err("protocol error: "
00972 "got BUNDLE_START before bundle completed");
00973 break_contact(ContactEvent::CL_ERROR);
00974 return false;
00975 }
00976 }
00977
00978 log_debug("got BUNDLE_START segment, creating new IncomingBundle");
00979 IncomingBundle* incoming = new IncomingBundle(new Bundle());
00980 incoming_.push_back(incoming);
00981 }
00982 else if (incoming_.empty())
00983 {
00984 log_err("protocol error: "
00985 "first data segment doesn't have BUNDLE_START flag set");
00986 break_contact(ContactEvent::CL_ERROR);
00987 return false;
00988 }
00989
00990
00991
00992
00993 IncomingBundle* incoming = incoming_.back();
00994 u_char* bp = (u_char*)recvbuf_.start();
00995
00996
00997 u_int32_t segment_len;
00998 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &segment_len);
00999
01000 if (sdnv_len < 0) {
01001 log_debug("handle_data_segment: too few bytes in buffer for sdnv (%zu)",
01002 recvbuf_.fullbytes());
01003 return false;
01004 }
01005
01006 recvbuf_.consume(1 + sdnv_len);
01007
01008 if (segment_len == 0) {
01009 log_err("protocol error -- zero length segment");
01010 break_contact(ContactEvent::CL_ERROR);
01011 return false;
01012 }
01013
01014 size_t segment_offset = incoming->rcvd_data_.num_contiguous();
01015 log_debug("handle_data_segment: "
01016 "got segment of length %u at offset %zu ",
01017 segment_len, segment_offset);
01018
01019 incoming->ack_data_.set(segment_offset + segment_len - 1);
01020
01021 log_debug("handle_data_segment: "
01022 "updated ack_data (segment_offset %zu) *%p ack_data *%p",
01023 segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01024
01025
01026
01027
01028
01029 if (flags & BUNDLE_END)
01030 {
01031 incoming->total_length_ = incoming->rcvd_data_.num_contiguous() +
01032 segment_len;
01033
01034 log_debug("got BUNDLE_END: total length %u",
01035 incoming->total_length_);
01036 }
01037
01038 recv_segment_todo_ = segment_len;
01039 return handle_data_todo();
01040 }
01041
01042
01043 bool
01044 StreamConvergenceLayer::Connection::handle_data_todo()
01045 {
01046
01047
01048 ASSERT(!incoming_.empty());
01049 ASSERT(recv_segment_todo_ != 0);
01050
01051
01052
01053
01054 IncomingBundle* incoming = incoming_.back();
01055 size_t rcvd_offset = incoming->rcvd_data_.num_contiguous();
01056 size_t rcvd_len = recvbuf_.fullbytes();
01057 size_t chunk_len = std::min(rcvd_len, recv_segment_todo_);
01058
01059 if (rcvd_len == 0) {
01060 return false;
01061 }
01062
01063 log_debug("handle_data_todo: "
01064 "reading todo segment %zu/%zu at offset %zu",
01065 chunk_len, recv_segment_todo_, rcvd_offset);
01066
01067 bool last;
01068 int cc = BundleProtocol::consume(incoming->bundle_.object(),
01069 (u_char*)recvbuf_.start(),
01070 chunk_len, &last);
01071 if (cc < 0) {
01072 log_err("protocol error parsing bundle data segment");
01073 break_contact(ContactEvent::CL_ERROR);
01074 return false;
01075 }
01076
01077 ASSERT(cc == (int)chunk_len);
01078
01079 recv_segment_todo_ -= chunk_len;
01080 recvbuf_.consume(chunk_len);
01081
01082 incoming->rcvd_data_.set(rcvd_offset, chunk_len);
01083
01084 log_debug("handle_data_todo: "
01085 "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
01086 rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01087
01088 if (recv_segment_todo_ == 0) {
01089 check_completed(incoming);
01090 return true;
01091 }
01092
01093 return false;
01094 }
01095
01096
01097 void
01098 StreamConvergenceLayer::Connection::check_completed(IncomingBundle* incoming)
01099 {
01100 u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous();
01101
01102
01103
01104 if (incoming->total_length_ == 0) {
01105 return;
01106 }
01107
01108 u_int32_t formatted_len =
01109 BundleProtocol::total_length(&incoming->bundle_->recv_blocks_);
01110
01111 log_debug("check_completed: rcvd %u / %u (formatted length %u)",
01112 rcvd_len, incoming->total_length_, formatted_len);
01113
01114 if (rcvd_len < incoming->total_length_) {
01115 return;
01116 }
01117
01118 if (rcvd_len > incoming->total_length_) {
01119 log_err("protocol error: received too much data -- "
01120 "got %u, total length %u",
01121 rcvd_len, incoming->total_length_);
01122
01123
01124
01125
01126 protocol_err:
01127 incoming->rcvd_data_.clear();
01128 break_contact(ContactEvent::CL_ERROR);
01129 return;
01130 }
01131
01132
01133
01134 if (incoming->total_length_ != formatted_len) {
01135 log_err("protocol error: CL total length %u "
01136 "doesn't match bundle protocol total %u",
01137 incoming->total_length_, formatted_len);
01138 goto protocol_err;
01139
01140 }
01141
01142 BundleDaemon::post(
01143 new BundleReceivedEvent(incoming->bundle_.object(),
01144 EVENTSRC_PEER,
01145 incoming->total_length_,
01146 contact_.object()));
01147 }
01148
01149
01150 bool
01151 StreamConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags)
01152 {
01153 (void)flags;
01154 u_char* bp = (u_char*)recvbuf_.start();
01155 u_int32_t acked_len;
01156 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
01157
01158 if (sdnv_len < 0) {
01159 log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
01160 recvbuf_.fullbytes());
01161 return false;
01162 }
01163
01164 recvbuf_.consume(1 + sdnv_len);
01165
01166 if (inflight_.empty()) {
01167 log_err("protocol error: got ack segment with no inflight bundle");
01168 break_contact(ContactEvent::CL_ERROR);
01169 return false;
01170 }
01171
01172 InFlightBundle* inflight = inflight_.front();
01173
01174 size_t ack_begin;
01175 DataBitmap::iterator i = inflight->ack_data_.begin();
01176 if (i == inflight->ack_data_.end()) {
01177 ack_begin = 0;
01178 } else {
01179 i.skip_contiguous();
01180 ack_begin = *i + 1;
01181 }
01182
01183 if (acked_len < ack_begin) {
01184 log_err("protocol error: got ack for length %u but already acked up to %zu",
01185 acked_len, ack_begin);
01186 break_contact(ContactEvent::CL_ERROR);
01187 return false;
01188 }
01189
01190 inflight->ack_data_.set(0, acked_len);
01191
01192
01193
01194
01195 if (acked_len == inflight->total_length_) {
01196 log_debug("handle_ack_segment: got final ack for %zu byte range -- "
01197 "acked_len %u, ack_data *%p",
01198 (size_t)acked_len - ack_begin,
01199 acked_len, &inflight->ack_data_);
01200
01201 BundleDaemon::post(
01202 new BundleTransmittedEvent(inflight->bundle_.object(),
01203 contact_,
01204 contact_->link(),
01205 inflight->sent_data_.num_contiguous(),
01206 inflight->ack_data_.num_contiguous()));
01207
01208
01209 check_completed(inflight);
01210
01211 } else {
01212 log_debug("handle_ack_segment: "
01213 "got acked_len %u (%zu byte range) -- ack_data *%p",
01214 acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
01215 }
01216
01217 return true;
01218 }
01219
01220
01221 bool
01222 StreamConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags)
01223 {
01224 (void)flags;
01225 log_debug("got refuse_bundle message");
01226 log_err("REFUSE_BUNDLE not implemented");
01227 break_contact(ContactEvent::CL_ERROR);
01228 return true;
01229 }
01230
01231 bool
01232 StreamConvergenceLayer::Connection::handle_keepalive(u_int8_t flags)
01233 {
01234 (void)flags;
01235 log_debug("got keepalive message");
01236 recvbuf_.consume(1);
01237 return true;
01238 }
01239
01240
01241 void
01242 StreamConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
01243 {
01244
01245
01246
01247 if (breaking_contact_) {
01248 return;
01249 }
01250 breaking_contact_ = true;
01251
01252
01253
01254
01255 bool send_shutdown = false;
01256 shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON;
01257
01258 switch (reason) {
01259 case ContactEvent::USER:
01260
01261 send_shutdown = true;
01262 shutdown_reason = SHUTDOWN_BUSY;
01263 break;
01264
01265 case ContactEvent::IDLE:
01266
01267 send_shutdown = true;
01268 shutdown_reason = SHUTDOWN_IDLE_TIMEOUT;
01269 break;
01270
01271 case ContactEvent::SHUTDOWN:
01272
01273
01274
01275 send_shutdown = true;
01276 break;
01277
01278 case ContactEvent::BROKEN:
01279 case ContactEvent::CL_ERROR:
01280
01281 send_shutdown = false;
01282 break;
01283
01284 case ContactEvent::CL_VERSION:
01285
01286 send_shutdown = true;
01287 shutdown_reason = SHUTDOWN_VERSION_MISMATCH;
01288 break;
01289
01290 case ContactEvent::INVALID:
01291 case ContactEvent::NO_INFO:
01292 case ContactEvent::RECONNECT:
01293 case ContactEvent::TIMEOUT:
01294 case ContactEvent::UNBLOCKED:
01295 NOTREACHED;
01296 break;
01297 }
01298
01299
01300
01301
01302
01303
01304
01305 if (send_shutdown &&
01306 sendbuf_.fullbytes() == 0 &&
01307 send_segment_todo_ == 0)
01308 {
01309 log_debug("break_contact: sending shutdown");
01310 char typecode = SHUTDOWN;
01311 if (shutdown_reason != SHUTDOWN_NO_REASON) {
01312 typecode |= SHUTDOWN_HAS_REASON;
01313 }
01314
01315
01316
01317 *sendbuf_.end() = typecode;
01318 sendbuf_.fill(1);
01319
01320 if (shutdown_reason != SHUTDOWN_NO_REASON) {
01321 *sendbuf_.end() = shutdown_reason;
01322 sendbuf_.fill(1);
01323 }
01324
01325 send_data();
01326 }
01327
01328 CLConnection::break_contact(reason);
01329 }
01330
01331
01332 bool
01333 StreamConvergenceLayer::Connection::handle_shutdown(u_int8_t flags)
01334 {
01335 log_debug("got SHUTDOWN byte");
01336 size_t shutdown_len = 1;
01337
01338 if (flags & SHUTDOWN_HAS_REASON)
01339 {
01340 shutdown_len += 1;
01341 }
01342
01343 if (flags & SHUTDOWN_HAS_DELAY)
01344 {
01345 shutdown_len += 2;
01346 }
01347
01348 if (recvbuf_.tailbytes() < shutdown_len)
01349 {
01350
01351
01352 return false;
01353 }
01354
01355
01356 recvbuf_.consume(1);
01357
01358 shutdown_reason_t reason = SHUTDOWN_NO_REASON;
01359 if (flags & SHUTDOWN_HAS_REASON)
01360 {
01361 switch (*recvbuf_.start()) {
01362 case SHUTDOWN_NO_REASON:
01363 reason = SHUTDOWN_NO_REASON;
01364 break;
01365 case SHUTDOWN_IDLE_TIMEOUT:
01366 reason = SHUTDOWN_IDLE_TIMEOUT;
01367 break;
01368 case SHUTDOWN_VERSION_MISMATCH:
01369 reason = SHUTDOWN_VERSION_MISMATCH;
01370 break;
01371 case SHUTDOWN_BUSY:
01372 reason = SHUTDOWN_BUSY;
01373 break;
01374 default:
01375 log_err("invalid shutdown reason code 0x%x", *recvbuf_.start());
01376 }
01377
01378 recvbuf_.consume(1);
01379 }
01380
01381 u_int16_t delay = 0;
01382 if (flags & SHUTDOWN_HAS_DELAY)
01383 {
01384 memcpy(&delay, recvbuf_.start(), 2);
01385 delay = ntohs(delay);
01386 recvbuf_.consume(2);
01387 }
01388
01389 log_info("got SHUTDOWN (%s) [reconnect delay %u]",
01390 shutdown_reason_to_str(reason), delay);
01391
01392 break_contact(ContactEvent::SHUTDOWN);
01393
01394 return false;
01395 }
01396
01397 }