00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <oasys/util/OptParser.h>
00019
00020 #include "CLConnection.h"
00021 #include "bundling/BundleDaemon.h"
00022 #include "bundling/BundlePayload.h"
00023 #include "contacts/ContactManager.h"
00024
00025 namespace dtn {
00026
00027
00028 CLConnection::CLConnection(const char* classname,
00029 const char* logpath,
00030 ConnectionConvergenceLayer* cl,
00031 LinkParams* params,
00032 bool active_connector)
00033 : Thread(classname),
00034 Logger(classname, logpath),
00035 contact_(classname),
00036 contact_up_(false),
00037 cmdqueue_(logpath),
00038 cl_(cl),
00039 params_(params),
00040 active_connector_(active_connector),
00041 num_pollfds_(0),
00042 poll_timeout_(-1),
00043 contact_broken_(false),
00044 num_pending_(0)
00045 {
00046 sendbuf_.reserve(params_->sendbuf_len_);
00047 recvbuf_.reserve(params_->recvbuf_len_);
00048 }
00049
00050
00051 CLConnection::~CLConnection()
00052 {
00053 }
00054
00055
00056 void
00057 CLConnection::run()
00058 {
00059 struct pollfd* cmdqueue_poll;
00060
00061 initialize_pollfds();
00062
00063 cmdqueue_poll = &pollfds_[num_pollfds_];
00064 cmdqueue_poll->fd = cmdqueue_.read_fd();
00065 cmdqueue_poll->events = POLLIN;
00066
00067
00068
00069
00070
00071 if (active_connector_) {
00072 connect();
00073 } else {
00074 accept();
00075 }
00076
00077 while (true) {
00078 if (contact_broken_) {
00079 log_debug("contact_broken set, exiting main loop");
00080 return;
00081 }
00082
00083
00084
00085
00086
00087 if (cmdqueue_.size() != 0) {
00088 process_command();
00089 continue;
00090 }
00091
00092
00093
00094
00095
00096 bool more_to_send = send_pending_data();
00097
00098
00099
00100 if (contact_broken_) {
00101 log_debug("contact_broken set, exiting main loop");
00102 return;
00103 }
00104
00105
00106
00107
00108
00109 for (int i = 0; i < num_pollfds_ + 1; ++i) {
00110 pollfds_[i].revents = 0;
00111 }
00112
00113 int timeout = more_to_send ? 0 : poll_timeout_;
00114
00115 log_debug("calling poll on %d fds with timeout %d",
00116 num_pollfds_ + 1, timeout);
00117
00118 int cc = oasys::IO::poll_multiple(pollfds_, num_pollfds_ + 1,
00119 timeout, NULL, logpath_);
00120
00121 if (cc == oasys::IOTIMEOUT)
00122 {
00123 handle_poll_timeout();
00124 }
00125 else if (cc > 0)
00126 {
00127 if (cc == 1 && cmdqueue_poll->revents != 0) {
00128 continue;
00129 }
00130 handle_poll_activity();
00131 }
00132 else
00133 {
00134 log_err("unexpected return from poll_multiple: %d", cc);
00135 break_contact(ContactEvent::BROKEN);
00136 return;
00137 }
00138 }
00139 }
00140
00141
00142 void
00143 CLConnection::queue_bundle(Bundle* bundle)
00144 {
00145
00146
00147
00148
00149
00150
00151
00152 LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00153 ASSERT(params != NULL);
00154
00155 oasys::atomic_incr(&num_pending_);
00156
00157 if (num_pending_.value >= params->busy_queue_depth_)
00158 {
00159 log_debug("%d bundles pending, setting BUSY state",
00160 num_pending_.value);
00161 contact_->link()->set_state(Link::BUSY);
00162 }
00163 else
00164 {
00165 log_debug("%d bundles pending -- leaving state as-is",
00166 num_pending_.value);
00167 }
00168
00169 cmdqueue_.push_back(
00170 CLConnection::CLMsg(CLConnection::CLMSG_SEND_BUNDLE, bundle));
00171 }
00172
00173
00174 void
00175 CLConnection::process_command()
00176 {
00177 CLMsg msg;
00178 bool ok = cmdqueue_.try_pop(&msg);
00179 ASSERT(ok);
00180
00181 switch(msg.type_) {
00182 case CLMSG_SEND_BUNDLE:
00183 log_debug("processing CLMSG_SEND_BUNDLE");
00184 handle_send_bundle(msg.bundle_.object());
00185 break;
00186
00187 case CLMSG_CANCEL_BUNDLE:
00188 log_debug("processing CLMSG_CANCEL_BUNDLE");
00189 handle_cancel_bundle(msg.bundle_.object());
00190 break;
00191
00192 case CLMSG_BREAK_CONTACT:
00193 log_debug("processing CLMSG_BREAK_CONTACT");
00194 break_contact(ContactEvent::USER);
00195 break;
00196 default:
00197 PANIC("invalid CLMsg typecode %d", msg.type_);
00198 }
00199 }
00200
00201
00202 void
00203 CLConnection::check_unblock_link()
00204 {
00205
00206
00207
00208
00209
00210
00211 LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00212 ASSERT(params != NULL);
00213
00214 oasys::atomic_decr(&num_pending_);
00215 ASSERT((int)num_pending_.value >= 0);
00216
00217 if (contact_->link()->state() == Link::BUSY)
00218 {
00219 if (num_pending_.value == (params->busy_queue_depth_ - 1))
00220 {
00221 log_debug("%d bundles pending, clearing BUSY state", num_pending_.value);
00222
00223
00224
00225
00226
00227
00228 BundleDaemon::post_at_head(
00229 new LinkStateChangeRequest(contact_->link(),
00230 Link::AVAILABLE,
00231 ContactEvent::UNBLOCKED));
00232 }
00233 else
00234 {
00235 log_debug("%d bundles pending, leaving state as-is",
00236 num_pending_.value);
00237 }
00238 }
00239 }
00240
00241
00242 void
00243 CLConnection::contact_up()
00244 {
00245 log_debug("contact_up");
00246 ASSERT(contact_ != NULL);
00247
00248 ASSERT(!contact_up_);
00249 contact_up_ = true;
00250
00251 BundleDaemon::post(new ContactUpEvent(contact_));
00252 }
00253
00254
00255 void
00256 CLConnection::break_contact(ContactEvent::reason_t reason)
00257 {
00258 log_debug("break_contact: %s", ContactEvent::reason_to_str(reason));
00259
00260 if (reason != ContactEvent::BROKEN) {
00261 disconnect();
00262 }
00263
00264 contact_broken_ = true;
00265
00266
00267
00268
00269
00270
00271
00272
00273 if ((reason != ContactEvent::USER) && (contact_ != NULL)) {
00274 BundleDaemon::post(
00275 new LinkStateChangeRequest(contact_->link(),
00276 Link::CLOSED,
00277 reason));
00278 }
00279 }
00280
00281
00282 void
00283 CLConnection::close_contact()
00284 {
00285 LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00286 ASSERT(params != NULL);
00287
00288
00289
00290 while (! inflight_.empty()) {
00291 InFlightBundle* inflight = inflight_.front();
00292 u_int32_t sent_bytes = inflight->sent_data_.num_contiguous();
00293 u_int32_t acked_bytes = inflight->ack_data_.num_contiguous();
00294
00295 if ((! params->reactive_frag_enabled_) ||
00296 (sent_bytes == 0) ||
00297 (contact_->link()->is_reliable() && acked_bytes == 0))
00298 {
00299 log_debug("posting transmission failed event "
00300 "(reactive fragmentation %s, %s link, acked_bytes %u)",
00301 params->reactive_frag_enabled_ ? "enabled" : "disabled",
00302 contact_->link()->is_reliable() ? "reliable" : "unreliable",
00303 acked_bytes);
00304
00305 BundleDaemon::post(
00306 new BundleTransmitFailedEvent(inflight->bundle_.object(),
00307 contact_, contact_->link()));
00308
00309 } else {
00310 BundleDaemon::post(
00311 new BundleTransmittedEvent(inflight->bundle_.object(),
00312 contact_, contact_->link(),
00313 sent_bytes, acked_bytes));
00314 }
00315
00316 inflight_.pop_front();
00317 }
00318
00319
00320
00321
00322 if (! incoming_.empty()) {
00323 IncomingBundle* incoming = incoming_.back();
00324 if(!incoming->rcvd_data_.empty())
00325 {
00326 size_t rcvd_len = incoming->rcvd_data_.last() + 1;
00327
00328 size_t header_block_length =
00329 BundleProtocol::payload_offset(&incoming->bundle_->recv_blocks_);
00330
00331 if ((incoming->total_length_ == 0) &&
00332 params->reactive_frag_enabled_ &&
00333 (rcvd_len > header_block_length))
00334 {
00335 log_debug("partial arrival of bundle: "
00336 "got %zu bytes [hdr %zu payload %zu]",
00337 rcvd_len, header_block_length,
00338 incoming->bundle_->payload_.length());
00339
00340 BundleDaemon::post(
00341 new BundleReceivedEvent(incoming->bundle_.object(),
00342 EVENTSRC_PEER, rcvd_len,
00343 contact_.object()));
00344 }
00345 }
00346 }
00347
00348
00349 while (!incoming_.empty()) {
00350 IncomingBundle* incoming = incoming_.back();
00351 incoming_.pop_back();
00352 delete incoming;
00353 }
00354
00355
00356
00357
00358 if (cmdqueue_.size() > 0) {
00359 log_warn("close_contact: %zu CL commands still in queue: ",
00360 cmdqueue_.size());
00361
00362 while (cmdqueue_.size() != 0) {
00363 CLMsg msg;
00364 bool ok = cmdqueue_.try_pop(&msg);
00365 ASSERT(ok);
00366
00367 log_warn("close_contact: %s still in queue", clmsg_to_str(msg.type_));
00368
00369 if (msg.type_ == CLMSG_SEND_BUNDLE) {
00370 BundleDaemon::post(
00371 new BundleTransmitFailedEvent(msg.bundle_.object(),
00372 contact_,
00373 contact_->link()));
00374
00375 }
00376 }
00377 }
00378 }
00379
00380
00381 void
00382 CLConnection::find_contact(const EndpointID& peer_eid)
00383 {
00384
00385
00386
00387
00388
00389
00390
00391
00392
00393
00394
00395
00396 if (contact_ == NULL) {
00397
00398 ASSERT(nexthop_ != "");
00399
00400
00401 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00402 oasys::ScopeLock l(cm->lock(), "CLConnection::find_contact");
00403
00404 Link* link = cm->find_link_to(cl_, "", peer_eid,
00405 Link::OPPORTUNISTIC,
00406 Link::AVAILABLE | Link::UNAVAILABLE);
00407
00408
00409 if (link != NULL && (link->contact() == NULL)) {
00410 link->set_nexthop(nexthop_);
00411 log_debug("found idle opportunistic link *%p", link);
00412
00413 } else {
00414 if (link != NULL) {
00415 log_warn("in-use opportunistic link *%p returned from "
00416 "ContactManager::find_link_to", link);
00417 }
00418
00419 link = cm->new_opportunistic_link(cl_,
00420 nexthop_.c_str(),
00421 peer_eid);
00422 log_debug("created new opportunistic link *%p", link);
00423 }
00424
00425 ASSERT(! link->isopen());
00426
00427 contact_ = new Contact(link);
00428 contact_->set_cl_info(this);
00429 link->set_contact(contact_.object());
00430
00431
00432
00433
00434
00435
00436 LinkParams* lparams = dynamic_cast<LinkParams*>(link->cl_info());
00437 ASSERT(lparams != NULL);
00438 params_ = lparams;
00439 }
00440 }
00441
00442
00443 }