StreamConvergenceLayer.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2006 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <oasys/util/OptParser.h>
00040 #include "StreamConvergenceLayer.h"
00041 #include "bundling/AnnounceBundle.h"
00042 #include "bundling/BundleDaemon.h"
00043 #include "bundling/SDNV.h"
00044 #include "bundling/TempBundle.h"
00045 #include "contacts/ContactManager.h"
00046 
00047 namespace dtn {
00048 
00049 //----------------------------------------------------------------------
00050 StreamConvergenceLayer::StreamLinkParams::StreamLinkParams(bool init_defaults)
00051     : LinkParams(init_defaults),
00052       segment_ack_enabled_(true),
00053       keepalive_interval_(10),
00054       segment_length_(4096)
00055 {
00056 }
00057 
00058 //----------------------------------------------------------------------
00059 StreamConvergenceLayer::StreamConvergenceLayer(const char* logpath,
00060                                                const char* cl_name,
00061                                                u_int8_t    cl_version)
00062     : ConnectionConvergenceLayer(logpath, cl_name),
00063       cl_version_(cl_version)
00064 {
00065 }
00066 
00067 //----------------------------------------------------------------------
00068 bool
00069 StreamConvergenceLayer::parse_link_params(LinkParams* lparams,
00070                                           int argc, const char** argv,
00071                                           const char** invalidp)
00072 {
00073     // all subclasses should create a params structure that derives
00074     // from StreamLinkParams
00075     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00076     ASSERT(params != NULL);
00077                                
00078     oasys::OptParser p;
00079 
00080     p.addopt(new oasys::BoolOpt("segment_ack_enabled",
00081                                 &params->segment_ack_enabled_));
00082     
00083     p.addopt(new oasys::UIntOpt("keepalive_interval",
00084                                 &params->keepalive_interval_));
00085     
00086     p.addopt(new oasys::UIntOpt("segment_length",
00087                                 &params->segment_length_));
00088     
00089     int count = p.parse_and_shift(argc, argv, invalidp);
00090     if (count == -1) {
00091         return false;
00092     }
00093     argc -= count;
00094 
00095     return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
00096                                                          invalidp);
00097 }
00098 
00099 //----------------------------------------------------------------------
00100 bool
00101 StreamConvergenceLayer::finish_init_link(Link* link, LinkParams* lparams)
00102 {
00103     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
00104     ASSERT(params != NULL);
00105 
00106     // make sure to set the reliability bit in the link structure
00107     if (params->segment_ack_enabled_) {
00108         link->set_reliable(true);
00109     }
00110     
00111     return true;
00112 }
00113 
00114 //----------------------------------------------------------------------
00115 void
00116 StreamConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00117 {
00118     ConnectionConvergenceLayer::dump_link(link, buf);
00119     
00120     StreamLinkParams* params =
00121         dynamic_cast<StreamLinkParams*>(link->cl_info());
00122     ASSERT(params != NULL);
00123     
00124     buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
00125     buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
00126     buf->appendf("segment_length: %u\n", params->segment_length_);
00127 }
00128 
00129 //----------------------------------------------------------------------
00130 StreamConvergenceLayer::Connection::Connection(const char* classname,
00131                                                const char* logpath,
00132                                                ConvergenceLayer* cl,
00133                                                StreamLinkParams* params,
00134                                                bool active_connector)
00135     : CLConnection(classname, logpath, cl, params, active_connector),
00136       current_inflight_(NULL),
00137       send_segment_todo_(0),
00138       recv_segment_todo_(0)
00139 {
00140 }
00141 
00142 //----------------------------------------------------------------------
00143 void
00144 StreamConvergenceLayer::Connection::initiate_contact()
00145 {
00146     log_debug("initiate_contact called");
00147 
00148     // format the contact header
00149     ContactHeader contacthdr;
00150     contacthdr.magic   = htonl(MAGIC);
00151     contacthdr.version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00152     
00153     contacthdr.flags = 0;
00154 
00155     StreamLinkParams* params = stream_lparams();
00156     
00157     if (params->segment_ack_enabled_)
00158         contacthdr.flags |= SEGMENT_ACK_ENABLED;
00159     
00160     if (params->reactive_frag_enabled_)
00161         contacthdr.flags |= REACTIVE_FRAG_ENABLED;
00162     
00163     contacthdr.keepalive_interval = htons(params->keepalive_interval_);
00164 
00165     // copy the contact header into the send buffer
00166     ASSERT(sendbuf_.fullbytes() == 0);
00167     if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
00168         log_warn("send buffer too short: %zu < needed %zu",
00169                  sendbuf_.tailbytes(), sizeof(ContactHeader));
00170         sendbuf_.reserve(sizeof(ContactHeader));
00171     }
00172     
00173     memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
00174     sendbuf_.fill(sizeof(ContactHeader));
00175     
00176     // format the announce bundle and copy it into the send buffer,
00177     // preceded by an SDNV of its length
00178     TempBundle announce;
00179     BundleDaemon* bd = BundleDaemon::instance();
00180     AnnounceBundle::create_announce_bundle(&announce, bd->local_eid());
00181     size_t announce_len = BundleProtocol::formatted_length(&announce);
00182     size_t sdnv_len = SDNV::encoding_len(announce_len);
00183 
00184     if (sendbuf_.tailbytes() < sdnv_len + announce_len) {
00185         log_warn("send buffer too short: %zu < needed %zu",
00186                  sendbuf_.tailbytes(), sdnv_len + announce_len);
00187         sendbuf_.reserve(sdnv_len + announce_len);
00188     }
00189     
00190     sdnv_len = SDNV::encode(announce_len,
00191                             (u_char*)sendbuf_.end(),
00192                             sendbuf_.tailbytes());
00193     sendbuf_.fill(sdnv_len);
00194 
00195     int ret = BundleProtocol::format_bundle(&announce,
00196                                             (u_char*)sendbuf_.end(),
00197                                             sendbuf_.tailbytes());
00198     
00199     ASSERTF(ret > 0, "BundleProtocol::formatted_length disagreed with "
00200             "BundleProtocol::format_bundle");
00201     sendbuf_.fill(ret);
00202 
00203     // drain the send buffer
00204     note_data_sent();
00205     send_data();
00206 }
00207 
00208 //----------------------------------------------------------------------
00209 void
00210 StreamConvergenceLayer::Connection::handle_contact_initiation()
00211 {
00212     ASSERT(! contact_up_);
00213 
00214     /*
00215      * First check that we have enough data for the contact header
00216      */
00217     size_t len_needed = sizeof(ContactHeader);
00218     if (recvbuf_.fullbytes() < len_needed) {
00219  tooshort:
00220         log_debug("handle_contact_initiation: not enough data received "
00221                   "(need > %zu, got %zu)",
00222                   len_needed, recvbuf_.fullbytes());
00223         return;
00224     }
00225 
00226     /*
00227      * Now check for enough data for the announce bundle.
00228      */
00229     u_int64_t announce_len;
00230     int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
00231                                   sizeof(ContactHeader),
00232                                 recvbuf_.fullbytes() -
00233                                   sizeof(ContactHeader),
00234                                 &announce_len);
00235     if (sdnv_len < 0) {
00236         goto tooshort;
00237     }
00238     
00239     len_needed = sizeof(ContactHeader) + sdnv_len + announce_len;
00240     if (recvbuf_.fullbytes() < len_needed) {
00241         goto tooshort;
00242     }
00243     
00244     /*
00245      * Ok, we have enough data, parse the contact header.
00246      */
00247     ContactHeader contacthdr;
00248     memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
00249 
00250     contacthdr.magic              = ntohl(contacthdr.magic);
00251     contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
00252 
00253     recvbuf_.consume(sizeof(ContactHeader));
00254     
00255     /*
00256      * Check for valid magic number and version.
00257      */
00258     if (contacthdr.magic != MAGIC) {
00259         log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
00260                  "-- disconnecting.", contacthdr.magic, MAGIC);
00261         break_contact(ContactEvent::BROKEN);
00262         return;
00263     }
00264     
00265     u_int8_t cl_version = ((StreamConvergenceLayer*)cl_)->cl_version_;
00266     if (contacthdr.version != cl_version) {
00267         log_warn("remote sent version %d, expected version %d "
00268                  "-- disconnecting.", contacthdr.version, cl_version);
00269         break_contact(ContactEvent::BROKEN);
00270         return;
00271     }
00272 
00273     /*
00274      * Now do parameter negotiation.
00275      */
00276     StreamLinkParams* params = stream_lparams();
00277     
00278     params->keepalive_interval_ =
00279         std::min(params->keepalive_interval_,
00280                  (u_int)contacthdr.keepalive_interval);
00281 
00282     params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
00283                                    (contacthdr.flags & SEGMENT_ACK_ENABLED);
00284     
00285     params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
00286                                      (contacthdr.flags & REACTIVE_FRAG_ENABLED);
00287 
00288     /*
00289      * Now skip the sdnv that encodes the announce bundle length since
00290      * we parsed it above.
00291      */
00292     recvbuf_.consume(sdnv_len);
00293 
00294     /*
00295      * Finally, parse the announce bundle and give it to the base
00296      * class to handle (i.e. by linking us to a Contact if we don't
00297      * have one).
00298      */
00299     TempBundle announce;
00300     int ret = BundleProtocol::parse_bundle(&announce,
00301                                            (u_char*)recvbuf_.start(),
00302                                            recvbuf_.fullbytes());
00303     
00304     if (ret != (int)announce_len) {
00305         if (ret < 0) {
00306             log_err("protocol error: announce bundle length given as %llu, "
00307                     "parser requires more bytes", announce_len);
00308         } else {
00309             log_err("protocol error: announce bundle length given as %llu, "
00310                     "parser requires only %u", announce_len, ret);
00311         }
00312         break_contact(ContactEvent::BROKEN);
00313         return;
00314     }
00315 
00316     handle_announce_bundle(&announce);
00317     recvbuf_.consume(announce_len);
00318 
00319     /*
00320      * Now we initialize the various timers that are used for
00321      * keepalives / idle timeouts to make sure they're not used
00322      * uninitialized.
00323      */
00324     ::gettimeofday(&data_rcvd_, 0);
00325     ::gettimeofday(&data_sent_, 0);
00326     ::gettimeofday(&keepalive_sent_, 0);
00327 
00328     /*
00329      * Finally, we note that the contact is now up.
00330      */
00331     contact_up();
00332 }
00333 
00334 
00335 //----------------------------------------------------------------------
00336 void
00337 StreamConvergenceLayer::Connection::handle_send_bundle(Bundle* bundle)
00338 {
00339     // push the bundle onto the inflight queue. we'll handle sending
00340     // the bundle out in the callback for transmit_bundle_data
00341     InFlightBundle* inflight = new InFlightBundle(bundle);
00342 
00343     inflight->header_block_length_ =
00344         BundleProtocol::header_block_length(bundle);
00345 
00346     inflight->tail_block_length_ =
00347         BundleProtocol::tail_block_length(bundle);
00348 
00349     inflight->formatted_length_ =
00350         inflight->header_block_length_ +
00351         inflight->bundle_->payload_.length() +
00352         inflight->tail_block_length_;
00353 
00354     inflight_.push_back(inflight);
00355 }
00356 
00357 //----------------------------------------------------------------------
00358 bool
00359 StreamConvergenceLayer::Connection::send_pending_data()
00360 {
00361     // if the outgoing data buffer is full, we can't do anything until
00362     // we poll()
00363     if (sendbuf_.tailbytes() == 0) {
00364         return false;
00365     }
00366 
00367     // if we're in the middle of sending a segment, we need to continue
00368     // sending it. only if we completely send the segment do we fall
00369     // through to send acks, otherwise we return to try to finish it
00370     // again later.
00371     if (send_segment_todo_ != 0) {
00372         ASSERT(current_inflight_ != NULL);        
00373         send_data_todo(current_inflight_);
00374     }
00375     
00376     // see if we're broken or write blocked
00377     if (contact_broken_ || (send_segment_todo_ != 0)) {
00378         return false;
00379     }
00380     
00381     // now check if there are acks we need to send -- even if it
00382     // returns true, we continue on and try to send some real payload
00383     // data, otherwise we could get starved by arriving data and never
00384     // send anything out.
00385     bool sent_ack = send_pending_acks();
00386     
00387     // check if we need to start a new bundle. if we do, then
00388     // start_next_bundle handles the correct return code
00389     bool sent_data;
00390     if (current_inflight_ == NULL) {
00391         sent_data = start_next_bundle();
00392     } else {
00393         // otherwise send the next segment of the current bundle
00394         sent_data = send_next_segment(current_inflight_);
00395     }
00396 
00397     return sent_ack || sent_data;
00398 }
00399 
00400 //----------------------------------------------------------------------
00401 bool
00402 StreamConvergenceLayer::Connection::send_pending_acks()
00403 {
00404     if (contact_broken_ || incoming_.empty()) {
00405         return false; // nothing to do
00406     }
00407     
00408     IncomingBundle* incoming = incoming_.front();
00409 
00410     if (incoming->ack_data_.empty()) {
00411         return false; // nothing to do
00412     }
00413 
00414     // when data segment headers are received, the first and last bit of
00415     // the segment are marked in ack_data.
00416     //
00417     // therefore, to figure out what acks should be sent for those
00418     // segments, we set the start iterator to be the last byte in the
00419     // contiguous range at the beginning of ack_data_, thereby
00420     // skipping everything that's already been acked. we then set the
00421     // end iterator to the next byte to be acked (i.e. the last byte
00422     // in the range that was received). the length to be acked is
00423     // therefore the range up to and including the end byte
00424     //
00425     // however, we have to be careful to check the recv_data as well
00426     // to make sure we've actually gotten the segment.
00427     DataBitmap::iterator start = incoming->ack_data_.begin();
00428     start.skip_contiguous();
00429     DataBitmap::iterator end = start + 1;
00430 
00431     size_t received_bytes = incoming->rcvd_data_.last() + 1;
00432 
00433     int sent_start = -1;
00434     size_t last_ack = 0;
00435     while (end != incoming->ack_data_.end()) {
00436         size_t ack_len   = *end + 1;
00437         size_t segment_len = ack_len - *start;
00438         (void)segment_len;
00439 
00440         if (ack_len > received_bytes) {
00441             log_debug("send_pending_acks: "
00442                       "waiting to send ack length %zu for %zu byte segment "
00443                       "since only received %zu",
00444                       ack_len, segment_len, received_bytes);
00445             break;
00446         }
00447 
00448         // make sure we have space in the send buffer
00449         size_t encoding_len = 1 + SDNV::encoding_len(ack_len);
00450         if (encoding_len <= sendbuf_.tailbytes()) {
00451             log_debug("send_pending_acks: "
00452                       "sending ack length %zu for %zu byte segment "
00453                       "[range %zu..%zu]",
00454                       ack_len, segment_len, *start, *end);
00455 
00456             if (sent_start == -1) {
00457                 sent_start = *start;
00458             }
00459             
00460             ASSERT(last_ack <= ack_len);
00461             last_ack = ack_len;
00462             
00463             *sendbuf_.end() = ACK_SEGMENT;
00464             int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
00465                                    sendbuf_.tailbytes() - 1);
00466             ASSERT(encoding_len = len + 1);
00467             sendbuf_.fill(encoding_len);
00468 
00469         } else {
00470             log_debug("send_pending_acks: "
00471                       "no space for ack in buffer (need %zu, have %zu)",
00472                       encoding_len, sendbuf_.tailbytes());
00473             break;
00474         }
00475 
00476         // advance the two iterators to the next segment
00477         start = end + 1;
00478         if (start == incoming->ack_data_.end()) {
00479             break;
00480         }
00481         
00482         end  = start + 1;
00483 
00484         log_debug("send_pending_acks: "
00485                   "found another gap (%zu, %zu)", *start, *end);
00486     }
00487 
00488     if (last_ack != 0) {
00489         incoming->ack_data_.set(0, last_ack);
00490         
00491         log_debug("send_pending_acks: "
00492                   "updated ack_data for ack length %zu: *%p ",
00493                   last_ack, &incoming->ack_data_);
00494         
00495         send_data();
00496         note_data_sent();
00497     }
00498 
00499     // now, check if a) we've gotten everything we're supposed to
00500     // (i.e. total_length_ isn't zero), and b) we're done with all the
00501     // acks we need to send
00502     if ((incoming->total_length_ != 0) &&
00503         (incoming->ack_data_.num_contiguous() == incoming->total_length_))
00504     {
00505         log_debug("send_pending_acks: acked all %zu bytes of bundle %d",
00506                   incoming->total_length_, incoming->bundle_->bundleid_);
00507         
00508         incoming_.pop_front();
00509         delete incoming;
00510     }
00511     else
00512     {
00513         log_debug("send_pending_acks: "
00514                   "still need to send acks -- total_rcvd %zu, acked_range %zu",
00515                   incoming->total_length_,
00516                   incoming->ack_data_.num_contiguous());
00517     }
00518 
00519     // return true if we've sent something
00520     return (last_ack != 0);
00521 }
00522 
00523          
00524 //----------------------------------------------------------------------
00525 bool
00526 StreamConvergenceLayer::Connection::start_next_bundle()
00527 {
00528     ASSERT(current_inflight_ == NULL);
00529 
00530     // find the bundle to start (identified by having nothing yet in
00531     // sent_data) and store it in current_inflight_
00532     InFlightList::iterator iter;
00533     for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
00534         InFlightBundle* inflight = *iter;
00535         
00536         if (contact_broken_)
00537             return false;
00538 
00539         // skip entries that we've sent completely
00540         if (inflight->sent_data_.num_contiguous() ==
00541             inflight->formatted_length_)
00542         {
00543             log_debug("start_next_bundle: "
00544                       "transmission of bundle %d already complete, skipping",
00545                       inflight->bundle_->bundleid_);
00546             continue;
00547         }
00548 
00549         // otherwise, we must not have sent anything for this bundle
00550         // at all, so assert that sent_data is empty
00551         ASSERT(inflight->sent_data_.empty());
00552         current_inflight_ = inflight;
00553         break;
00554     }
00555 
00556     // there might not be anything to send, in which case we return
00557     // false to indicate as such
00558     if (current_inflight_ == NULL) {
00559         return false;
00560     }
00561 
00562     Bundle* bundle = current_inflight_->bundle_.object();
00563     StreamLinkParams* params = stream_lparams();
00564     
00565     // to start off the transmission, we send the START_BUNDLE
00566     // typecode, followed by a segment of bundle data, including at a
00567     // minimum the bundle headers.
00568 
00569     size_t header_len = BundleProtocol::header_block_length(bundle);
00570     size_t segment_len = std::min((size_t)params->segment_length_,
00571                                   current_inflight_->formatted_length_);
00572     if (segment_len < header_len) {
00573         log_warn("configured segment_len %zu smaller than header segment "
00574                  "length %zu... upping segment length", segment_len, header_len);
00575         segment_len = header_len;
00576     }
00577     
00578     size_t sdnv_len = SDNV::encoding_len(segment_len);
00579 
00580     // we need to format all the headers at once, so make sure the
00581     // buffer has enough space for that. note that the segment on the
00582     // wire might include some payload bytes that we don't need to
00583     // handle here.
00584     size_t min_buffer_len = 1 + 1 + sdnv_len + header_len;
00585     if (min_buffer_len > sendbuf_.tailbytes()) {
00586         if (min_buffer_len > sendbuf_.size()) {
00587             log_warn("start_bundle: "
00588                      "not enough space for headers [need %zu, have %zu] "
00589                      "(expanding buffer)",
00590                      min_buffer_len, sendbuf_.size());
00591             sendbuf_.reserve(min_buffer_len);
00592             ASSERT(min_buffer_len <= sendbuf_.tailbytes());
00593             
00594         } else {
00595             log_debug("start_bundle: "
00596                       "not enough space for headers [need %zu, have %zu] "
00597                       "(waiting for buffer to drain)",
00598                       min_buffer_len, sendbuf_.tailbytes());
00599             
00600             current_inflight_ = NULL;
00601             return false;
00602         }
00603     }
00604 
00605     log_debug("send_start_bundle: sending "
00606               "START_BUNDLE for bundle %d: initial segment len %zu [hdrs %zu]",
00607               current_inflight_->bundle_->bundleid_, segment_len, header_len);
00608     
00609     u_char* bp = (u_char*)sendbuf_.end();
00610     size_t len = sendbuf_.tailbytes();
00611 
00612     *bp++ = START_BUNDLE;
00613     len--;
00614     
00615     *bp++ = DATA_SEGMENT;
00616     len--;
00617     
00618     int cc = SDNV::encode(segment_len, bp, len);
00619     ASSERT(cc == (int)sdnv_len);
00620         
00621     bp  += sdnv_len;
00622     len -= sdnv_len;
00623     
00624     cc = BundleProtocol::format_header_blocks(bundle, bp, len);
00625     ASSERT(cc == (int)header_len);
00626     
00627     sendbuf_.fill(1 + 1 + sdnv_len + header_len);
00628     current_inflight_->sent_data_.set(0, header_len);
00629 
00630     // now that the header is done, we record the amount of the first
00631     // segment that's left to send and then call send_data_todo to send
00632     // out the payload. if there's nothing left to do (i.e. the first
00633     // segment is exactly the length of the headers), then we return
00634     // true to indicate that we want to make another pass through
00635     send_segment_todo_ = segment_len - header_len;
00636 
00637     if (send_segment_todo_ == 0) {
00638         note_data_sent();
00639         send_data();
00640         return true;
00641     }
00642 
00643     return send_data_todo(current_inflight_);
00644 }
00645 
00646 //----------------------------------------------------------------------
00647 bool
00648 StreamConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
00649 {
00650     if (sendbuf_.tailbytes() == 0) {
00651         return false;
00652     }
00653 
00654     ASSERT(send_segment_todo_ == 0);
00655 
00656     StreamLinkParams* params = stream_lparams();
00657     Bundle* bundle = inflight->bundle_.object();
00658 
00659     // we must have already sent at least the header segment
00660     ASSERT(!inflight->sent_data_.empty());
00661 
00662     size_t bytes_sent   = inflight->sent_data_.last() + 1;
00663     size_t payload_sent = bytes_sent - inflight->header_block_length_;
00664     size_t payload_len  = bundle->payload_.length();
00665 
00666     if (payload_sent == payload_len) {
00667         log_debug("send_next_segment: "
00668                   "already sent all %zu bytes of payload, finishing bundle",
00669                   payload_len);
00670         return finish_bundle(inflight);
00671     }
00672     
00673     size_t segment_len = std::min((size_t)params->segment_length_,
00674                                   payload_len - payload_sent);
00675     
00676     ASSERT(payload_sent + segment_len <= payload_len);
00677 
00678     size_t sdnv_len = SDNV::encoding_len(segment_len);
00679     
00680     if (sendbuf_.tailbytes() < 1 + sdnv_len) {
00681         log_debug("send_next_segment: "
00682                   "not enough space for segment header [need %zu, have %zu]",
00683                   1 + sdnv_len, sendbuf_.tailbytes());
00684         return false;
00685     }
00686     
00687     log_debug("send_next_segment: "
00688               "starting %zu byte segment [payload range %zu..%zu]",
00689               segment_len, payload_sent, payload_sent + segment_len);
00690 
00691     u_char* bp = (u_char*)sendbuf_.end();
00692     *bp++ = DATA_SEGMENT;
00693     int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
00694     ASSERT(cc == (int)sdnv_len);
00695     bp += sdnv_len;
00696     
00697     sendbuf_.fill(1 + sdnv_len);
00698     send_segment_todo_ = segment_len;
00699 
00700     // send_data_todo actually does the deed
00701     return send_data_todo(inflight);
00702 }
00703 
00704 //----------------------------------------------------------------------
00705 bool
00706 StreamConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
00707 {
00708     ASSERT(send_segment_todo_ != 0);
00709     
00710     while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
00711         u_char* bp = (u_char*)sendbuf_.end();
00712         
00713         size_t bytes_sent     = inflight->sent_data_.last() + 1;
00714         size_t payload_offset = bytes_sent - inflight->header_block_length_;
00715         size_t send_len       = std::min(send_segment_todo_, sendbuf_.tailbytes());
00716     
00717         Bundle* bundle = inflight->bundle_.object();
00718         bundle->payload_.read_data(payload_offset, send_len, bp,
00719                                    BundlePayload::FORCE_COPY);
00720         sendbuf_.fill(send_len);
00721     
00722         inflight->sent_data_.set(bytes_sent, send_len);
00723     
00724         log_debug("send_data_todo: "
00725                   "sent %zu/%zu of current segment from payload offset %zu "
00726                   "(%zu todo), updated sent_data *%p",
00727                   send_len, send_segment_todo_, payload_offset,
00728                   send_segment_todo_ - send_len, &inflight->sent_data_);
00729 
00730         send_segment_todo_ -= send_len;
00731 
00732         note_data_sent();
00733         send_data();
00734         
00735         if (contact_broken_)
00736             return true;
00737     }
00738 
00739     return (send_segment_todo_ == 0);
00740 }
00741 
00742 //----------------------------------------------------------------------
00743 bool
00744 StreamConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
00745 {
00746     if (sendbuf_.tailbytes() == 0) {
00747         log_warn("finish_bundle: rare out of space condition... "
00748                  "making room for one byte");
00749         sendbuf_.reserve(1);
00750         ASSERT(sendbuf_.tailbytes() > 0);
00751     }
00752     
00753     log_debug("finish_bundle: sending END_BUNDLE message");
00754     
00755     *sendbuf_.end() = END_BUNDLE;
00756     sendbuf_.fill(1);
00757     
00758     note_data_sent();
00759     send_data();
00760 
00761     ASSERT(! inflight->bundle_->payload_.is_file_open());
00762     
00763     ASSERT(current_inflight_ == inflight);
00764     current_inflight_ = NULL;
00765     
00766     check_completed(inflight);
00767     
00768     return true;
00769 }
00770 
00771 //----------------------------------------------------------------------
00772 void
00773 StreamConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
00774 {
00775     // we can pop the inflight bundle off of the queue and clean it up
00776     // only when both finish_bundle is called (so current_inflight_ no
00777     // longer points to the inflight bundle), and after the final ack
00778     // for the bundle has been received (determined by looking at
00779     // inflight->ack_data_)
00780 
00781     if (current_inflight_ == inflight) {
00782         log_debug("check_completed: bundle %d still waiting for finish_bundle",
00783                   inflight->bundle_->bundleid_);
00784         return;
00785     }
00786 
00787     size_t acked_len = inflight->ack_data_.num_contiguous();
00788     if (acked_len < inflight->formatted_length_) {
00789         log_debug("check_completed: bundle %d only acked %zu/%zu",
00790                   inflight->bundle_->bundleid_,
00791                   acked_len, inflight->formatted_length_);
00792         return;
00793     }
00794 
00795     log_debug("check_completed: bundle %d transmission complete",
00796               inflight->bundle_->bundleid_);
00797     ASSERT(inflight == inflight_.front());
00798     inflight_.pop_front();
00799     delete inflight;
00800 }
00801 
00802 //----------------------------------------------------------------------
00803 void
00804 StreamConvergenceLayer::Connection::send_keepalive()
00805 {
00806     // there's no point in putting another byte in the buffer if
00807     // there's already data waiting to go out, since the arrival of
00808     // that data on the other end will do the same job as the
00809     // keepalive byte
00810     if (sendbuf_.fullbytes() != 0) {
00811         log_debug("send_keepalive: "
00812                   "send buffer has %zu bytes queued, suppressing keepalive",
00813                   sendbuf_.fullbytes());
00814         return;
00815     }
00816     ASSERT(sendbuf_.tailbytes() > 0);
00817 
00818     ::gettimeofday(&keepalive_sent_, 0);
00819 
00820     *(sendbuf_.end()) = KEEPALIVE;
00821     sendbuf_.fill(1);
00822 
00823     // don't note_data_sent() here since keepalive messages shouldn't
00824     // be counted for keeping an idle link open
00825     send_data();
00826 }
00827 //----------------------------------------------------------------------
00828 void
00829 StreamConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
00830 {
00831     (void)bundle;
00832 }
00833 
00834 //----------------------------------------------------------------------
00835 void
00836 StreamConvergenceLayer::Connection::handle_poll_timeout()
00837 {
00838     struct timeval now;
00839     u_int elapsed, elapsed2;
00840 
00841     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
00842     ASSERT(params != NULL);
00843     
00844     ::gettimeofday(&now, 0);
00845     
00846     // check that it hasn't been too long since we got some data from
00847     // the other side
00848     elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00849     if (elapsed > params->data_timeout_) {
00850         log_info("handle_poll_timeout: no data heard for %d msecs "
00851                  "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u) "
00852                  "-- closing contact",
00853                  elapsed,
00854                  (u_int)keepalive_sent_.tv_sec,
00855                  (u_int)keepalive_sent_.tv_usec,
00856                  (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
00857                  (u_int)now.tv_sec, (u_int)now.tv_usec);
00858             
00859         break_contact(ContactEvent::BROKEN);
00860         return;
00861     }
00862 
00863     // check if the connection has been idle for too long
00864     // (on demand links only)
00865     if (contact_->link()->type() == Link::ONDEMAND) {
00866         u_int idle_close_time = contact_->link()->params().idle_close_time_;
00867         
00868         elapsed  = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
00869         elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
00870         
00871         if (idle_close_time != 0 &&
00872             (elapsed > idle_close_time * 1000) &&
00873             (elapsed2 > idle_close_time * 1000))
00874         {
00875             log_info("closing idle connection "
00876                      "(no data received for %d msecs or sent for %d msecs)",
00877                      elapsed, elapsed2);
00878             break_contact(ContactEvent::IDLE);
00879             return;
00880         } else {
00881             log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
00882                       elapsed, elapsed2, idle_close_time * 1000);
00883         }
00884     }
00885     
00886     // check if it's time for us to send a keepalive (i.e. that we
00887     // haven't sent some data or another keepalive in at least the
00888     // configured keepalive_interval)
00889     if (params->keepalive_interval_ != 0) {
00890         elapsed  = TIMEVAL_DIFF_MSEC(now, data_sent_);
00891         elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
00892 
00893         if (std::min(elapsed, elapsed2) > (params->keepalive_interval_ * 1000))
00894         {
00895             log_debug("handle_poll_timeout: sending keepalive");
00896             send_keepalive();
00897         }
00898     }
00899 }
00900 
00901 //----------------------------------------------------------------------
00902 void
00903 StreamConvergenceLayer::Connection::process_data()
00904 {
00905     if (recvbuf_.fullbytes() == 0) {
00906         return;
00907     }
00908 
00909     log_debug("processing up to %zu bytes from receive buffer",
00910               recvbuf_.fullbytes());
00911 
00912     // all data (keepalives included) should be noted since it's used
00913     // for generating new keepalives
00914     note_data_rcvd();
00915 
00916     // the first thing we need to do is handle the contact initiation
00917     // sequence, i.e. the contact header and the announce bundle. we
00918     // know we need to do this if we haven't yet called contact_up()
00919     if (! contact_up_) {
00920         handle_contact_initiation();
00921         return;
00922     }
00923 
00924     // if a data segment is bigger than the receive buffer. when
00925     // processing a data segment, we mark the unread amount in the
00926     // data_segment_remaining_ field so if that's not zero, we need to
00927     // drain it, then fall through to handle the rest of the buffer
00928     if (recv_segment_todo_ != 0) {
00929         bool ok = handle_data_todo();
00930         
00931         if (!ok) {
00932             return;
00933         }
00934     }
00935     
00936     // now, drain cl messages from the receive buffer. we peek at the
00937     // first byte and dispatch to the correct handler routine
00938     // depending on the type of the CL message. we don't consume the
00939     // byte yet since there's a possibility that we need to read more
00940     // from the remote side to handle the whole message
00941     while (recvbuf_.fullbytes() != 0) {
00942         if (contact_broken_) return;
00943         
00944         char type = *recvbuf_.start();
00945 
00946         log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
00947                   recvbuf_.fullbytes());
00948         bool ok;
00949         switch (type) {
00950         case START_BUNDLE:
00951             ok = handle_start_bundle();
00952             break;
00953         case END_BUNDLE:
00954             ok = handle_end_bundle();
00955             break;
00956         case DATA_SEGMENT:
00957             ok = handle_data_segment();
00958             break;
00959         case ACK_SEGMENT:
00960             ok = handle_ack_segment();
00961             break;
00962         case KEEPALIVE:
00963             ok = handle_keepalive();
00964             break;
00965         case SHUTDOWN:
00966             ok = handle_shutdown();
00967             break;
00968         default:
00969             log_err("invalid CL message type 0x%x", type);
00970             break_contact(ContactEvent::BROKEN);
00971             return;
00972         }
00973 
00974         // if there's not enough data in the buffer to handle the
00975         // message, make sure there's space to receive more
00976         if (! ok) {
00977             if (recvbuf_.fullbytes() == recvbuf_.size()) {
00978                 log_warn("process_data: "
00979                          "%zu byte recv buffer full but too small for msg %u... "
00980                          "doubling buffer size",
00981                          recvbuf_.size(), type);
00982                 
00983                 recvbuf_.reserve(recvbuf_.size() * 2);
00984 
00985             } else if (recvbuf_.tailbytes() == 0) {
00986                 // force it to move the full bytes up to the front
00987                 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
00988                 ASSERT(recvbuf_.tailbytes() != 0);
00989             }
00990             
00991             return;
00992         }
00993     }
00994 }
00995 
00996 //----------------------------------------------------------------------
00997 void
00998 StreamConvergenceLayer::Connection::note_data_rcvd()
00999 {
01000     log_debug("noting data_rcvd");
01001     ::gettimeofday(&data_rcvd_, 0);
01002 }
01003 
01004 //----------------------------------------------------------------------
01005 void
01006 StreamConvergenceLayer::Connection::note_data_sent()
01007 {
01008     log_debug("noting data_sent");
01009     ::gettimeofday(&data_sent_, 0);
01010 }
01011 
01012 //----------------------------------------------------------------------
01013 bool
01014 StreamConvergenceLayer::Connection::handle_start_bundle()
01015 {
01016     recvbuf_.consume(1);
01017 
01018     if (! incoming_.empty()) {
01019         IncomingBundle* incoming = incoming_.back();
01020         if (incoming->total_length_ == 0) {
01021             log_err("protocol error: got START_BUNDLE before bundle completed");
01022             break_contact(ContactEvent::BROKEN);
01023             return false;
01024         }
01025     }
01026         
01027     log_debug("got START_BUNDLE message, creating new IncomingBundle");
01028     IncomingBundle* incoming = new IncomingBundle(NULL);
01029     incoming_.push_back(incoming);
01030     
01031     return true;
01032 }
01033 
01034 //----------------------------------------------------------------------
01035 bool
01036 StreamConvergenceLayer::Connection::handle_end_bundle()
01037 {
01038     recvbuf_.consume(1);
01039     
01040     if (incoming_.empty()) {
01041         log_err("protocol error: got END_BUNDLE with no incoming bundle");
01042         // XXX/demmer protocol error
01043         break_contact(ContactEvent::BROKEN);
01044         return false;
01045     }
01046     
01047     IncomingBundle* incoming = incoming_.back();
01048 
01049     if (incoming->rcvd_data_.empty()) {
01050         log_err("protocol error: got END_BUNDLE with no DATA_SEGMENT");
01051 
01052         // XXX/demmer protocol error
01053         break_contact(ContactEvent::BROKEN);
01054         return false;
01055     }
01056 
01057     // note that we could get an END_BUNDLE message before the whole
01058     // payload has been transmitted, in case the other side is doing
01059     // some fancy reordering. we mark the total amount in the
01060     // IncomingBundle structure so send_pending_acks knows when we're
01061     // done.
01062     
01063     incoming->total_length_ = incoming->rcvd_data_.last() + 1;
01064     
01065     size_t formatted_len =
01066         BundleProtocol::formatted_length(incoming->bundle_.object());
01067 
01068     log_debug("got END_BUNDLE: rcvd %zu / %zu",
01069               incoming->total_length_, formatted_len);
01070     
01071     if (incoming->total_length_ > formatted_len) {
01072         log_err("protocol error: received too much data -- "
01073                 "got %zu, formatted length %zu",
01074                 incoming->total_length_, formatted_len);
01075 
01076         // we pretend that we got nothing so the cleanup code in
01077         // CLConnection::close_contact doesn't try to post a received
01078         // event for the bundle
01079         incoming->rcvd_data_.clear();
01080 
01081         // XXX/demmer protocol error
01082         break_contact(ContactEvent::BROKEN);
01083         return false;
01084     }
01085 
01086     // XXX/demmer need to fix the fragmentation code to assume the
01087     // event includes the header bytes as well as the payload.
01088     size_t payload_rcvd = incoming->total_length_ -
01089                           incoming->header_block_length_;
01090 
01091     incoming->bundle_->payload_.close_file();
01092     
01093     BundleDaemon::post(
01094         new BundleReceivedEvent(incoming->bundle_.object(),
01095                                 EVENTSRC_PEER,
01096                                 payload_rcvd));
01097 
01098     return true;
01099 }
01100 
01101 //----------------------------------------------------------------------
01102 bool
01103 StreamConvergenceLayer::Connection::handle_data_segment()
01104 {
01105     if (incoming_.empty()) {
01106         log_err("protocol error: got data segment before START_BUNDLE");
01107         break_contact(ContactEvent::BROKEN);
01108         return false;
01109     }
01110 
01111     u_char* bp = (u_char*)recvbuf_.start();
01112 
01113     size_t segment_offset;
01114     u_int32_t segment_len;
01115     
01116     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &segment_len);
01117 
01118     if (sdnv_len < 0) {
01119         log_debug("handle_data_segment: too few bytes for sdnv (%zu)",
01120                   recvbuf_.fullbytes());
01121         return false;
01122     }
01123 
01124     // Note that there may be more than one incoming bundle on the
01125     // IncomingList, but it's the one at the back that we're reading
01126     // in data for. Others are waiting for acks to be sent.
01127     IncomingBundle* incoming = incoming_.back();
01128     
01129     // if this is the first segment, try to parse the headers and create
01130     // the bundle structure. if we can't handle it fully, we leave the
01131     // partial header bytes in the buffer (potentially increasing its
01132     // size) and return to wait for more data to arrive.
01133     if (incoming->rcvd_data_.empty()) {
01134         Bundle* bundle = new Bundle();
01135 
01136         size_t rcvd_bytes = recvbuf_.fullbytes() - 1 - sdnv_len;
01137         int header_len = BundleProtocol::parse_header_blocks(bundle,
01138                                                              bp + 1 + sdnv_len,
01139                                                              rcvd_bytes);
01140         if (header_len < 0) {
01141             log_debug("handle_data_segment: "
01142                       "not enough data to parse bundle headers [have %zu]",
01143                       recvbuf_.fullbytes());
01144             delete bundle;
01145             return false;
01146         }
01147 
01148         log_debug("handle_data_segment: "
01149                   "got header segments [len %u] for bundle *%p",
01150                   header_len, bundle);
01151         
01152         incoming->bundle_ = bundle;
01153         incoming->header_block_length_ = header_len;
01154         recvbuf_.consume(1 + sdnv_len + header_len);
01155         incoming->rcvd_data_.set(0, header_len);
01156 
01157         segment_offset = 0;
01158         recv_segment_todo_ = segment_len - header_len;
01159 
01160     } else {
01161         // this is a chunk of payload and/or tail block
01162         segment_offset          = incoming->rcvd_data_.num_contiguous();
01163         size_t payload_offset = segment_offset - incoming->header_block_length_;
01164         (void)payload_offset;
01165 
01166         log_debug("handle_data_segment: "
01167                   "got segment of length %u at offset %zu "
01168                   "(payload offset %zu/%zu)",
01169                   segment_len, segment_offset, payload_offset,
01170                   incoming->bundle_->payload_.length());
01171         
01172         if (segment_offset != incoming->ack_data_.last() + 1) {
01173             log_err("XXX/demmer segment offset %zu last ack %zu",
01174                     segment_offset, incoming->ack_data_.last());
01175         }
01176         
01177         recvbuf_.consume(1 + sdnv_len);
01178         
01179         recv_segment_todo_ = segment_len;
01180     }
01181     
01182     incoming->ack_data_.set(segment_offset);
01183     incoming->ack_data_.set(segment_offset + segment_len - 1);
01184 
01185     log_debug("handle_data_segment: "
01186               "updated recv_data (segment_offset %zu) *%p ack_data *%p",
01187               segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01188 
01189     if (recv_segment_todo_ != 0) {
01190         handle_data_todo();
01191     }
01192     
01193     return true;
01194 }
01195 
01196 //----------------------------------------------------------------------
01197 bool
01198 StreamConvergenceLayer::Connection::handle_data_todo()
01199 {
01200     // We shouldn't get ourselves here unless there's something
01201     // incoming and there's something left to read
01202     ASSERT(!incoming_.empty());
01203     ASSERT(recv_segment_todo_ != 0);
01204     
01205     // Note that there may be more than one incoming bundle on the
01206     // IncomingList. There's always only one (at the back) that we're
01207     // reading in data for, the rest are waiting for acks to go out
01208     IncomingBundle* incoming = incoming_.back();
01209 
01210     size_t rcvd_offset    = incoming->rcvd_data_.num_contiguous();
01211     size_t rcvd_len       = recvbuf_.fullbytes();
01212     size_t payload_offset = rcvd_offset - incoming->header_block_length_;
01213     size_t payload_len    = incoming->bundle_->payload_.length();
01214     size_t chunk_len      = std::min(rcvd_len, recv_segment_todo_);
01215 
01216     if (rcvd_len == 0) {
01217         return false; // nothing to do
01218     }
01219     
01220     log_debug("handle_data_todo: "
01221               "reading todo segment %zu/%zu at payload offset %zu/%zu",
01222               chunk_len, recv_segment_todo_, payload_offset, payload_len);
01223     
01224     if (chunk_len + payload_offset > payload_len) {
01225         log_err("NEED TO HANDLE BLOCKS AFTER THE PAYLOAD");
01226         break_contact(ContactEvent::BROKEN);
01227         return false;
01228     }
01229 
01230     u_char* bp = (u_char*)recvbuf_.start();
01231     incoming->bundle_->payload_.write_data(bp, payload_offset, chunk_len);
01232 
01233     recv_segment_todo_ -= chunk_len;
01234     recvbuf_.consume(chunk_len);
01235 
01236     incoming->rcvd_data_.set(rcvd_offset, chunk_len);
01237     
01238     log_debug("handle_data_todo: "
01239               "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
01240               rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
01241     
01242     if (recv_segment_todo_ == 0) {
01243         return true; // completed segment
01244     }
01245 
01246     return false;
01247 }
01248 
01249 //----------------------------------------------------------------------
01250 bool
01251 StreamConvergenceLayer::Connection::handle_ack_segment()
01252 {
01253     u_char* bp = (u_char*)recvbuf_.start();
01254     u_int32_t acked_len;
01255     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
01256     
01257     if (sdnv_len < 0) {
01258         log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
01259                   recvbuf_.fullbytes());
01260         return false;
01261     }
01262 
01263     recvbuf_.consume(1 + sdnv_len);
01264 
01265     if (inflight_.empty()) {
01266         log_err("protocol error: got ack segment with no inflight bundle");
01267         break_contact(ContactEvent::BROKEN);
01268         return false;
01269     }
01270 
01271     InFlightBundle* inflight = inflight_.front();
01272 
01273     size_t ack_begin;
01274     DataBitmap::iterator i = inflight->ack_data_.begin();
01275     if (i == inflight->ack_data_.end()) {
01276         ack_begin = 0;
01277     } else {
01278         i.skip_contiguous();
01279         ack_begin = *i + 1;
01280     }
01281 
01282     if (acked_len < ack_begin) {
01283         log_err("protocol error: got ack for length %u but already acked up to %zu",
01284                 acked_len, ack_begin);
01285         break_contact(ContactEvent::BROKEN);
01286         return false;
01287     }
01288     
01289     inflight->ack_data_.set(0, acked_len);
01290 
01291     // now check if this was the last ack for the bundle, in which
01292     // case we can pop it off the list and post a
01293     // BundleTransmittedEvent
01294     if (acked_len == inflight->formatted_length_) {
01295         log_debug("handle_ack_segment: "
01296                   "got final ack for %zu byte range -- acked_len %u, ack_data *%p",
01297                   (size_t)acked_len - ack_begin, acked_len, &inflight->ack_data_);
01298 
01299         // XXX/demmer change this event to include the header bytes
01300         
01301         BundleDaemon::post(
01302             new BundleTransmittedEvent(inflight->bundle_.object(),
01303                                        contact_,
01304                                        inflight->sent_data_.num_contiguous() -
01305                                          inflight->header_block_length_,
01306                                        inflight->ack_data_.num_contiguous() -
01307                                          inflight->header_block_length_));
01308 
01309         // might delete inflight
01310         check_completed(inflight);
01311         
01312     } else {
01313         log_debug("handle_ack_segment: "
01314                   "got acked_len %u (%zu byte range) -- ack_data *%p",
01315                   acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
01316     }
01317 
01318     return true;
01319 }
01320 
01321 //----------------------------------------------------------------------
01322 bool
01323 StreamConvergenceLayer::Connection::handle_keepalive()
01324 {
01325     log_debug("got keepalive message");
01326     recvbuf_.consume(1);
01327     return true;
01328 }
01329 
01330 //----------------------------------------------------------------------
01331 bool
01332 StreamConvergenceLayer::Connection::handle_shutdown()
01333 {
01334     log_debug("got SHUTDOWN byte");
01335     recvbuf_.consume(1);
01336     break_contact(ContactEvent::SHUTDOWN);
01337     
01338     return false;
01339 }
01340 
01341 } // namespace dtn

Generated on Fri Dec 22 14:48:00 2006 for DTN Reference Implementation by  doxygen 1.5.1