00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <oasys/util/OptParser.h>
00040
00041 #include "CLConnection.h"
00042 #include "bundling/BundleDaemon.h"
00043 #include "bundling/BundlePayload.h"
00044 #include "contacts/ContactManager.h"
00045
00046 namespace dtn {
00047
00048
00049 CLConnection::CLConnection(const char* classname,
00050 const char* logpath,
00051 ConvergenceLayer* cl,
00052 LinkParams* params,
00053 bool active_connector)
00054 : Thread(classname),
00055 Logger(classname, logpath),
00056 contact_(classname),
00057 contact_up_(false),
00058 cmdqueue_(logpath),
00059 cl_(cl),
00060 params_(params),
00061 active_connector_(active_connector),
00062 num_pollfds_(0),
00063 poll_timeout_(-1),
00064 contact_broken_(false)
00065 {
00066 sendbuf_.reserve(params_->sendbuf_len_);
00067 recvbuf_.reserve(params_->recvbuf_len_);
00068 }
00069
00070
00071 CLConnection::~CLConnection()
00072 {
00073 }
00074
00075
00076 void
00077 CLConnection::run()
00078 {
00079 struct pollfd* cmdqueue_poll;
00080
00081 initialize_pollfds();
00082
00083 cmdqueue_poll = &pollfds_[num_pollfds_];
00084 cmdqueue_poll->fd = cmdqueue_.read_fd();
00085 cmdqueue_poll->events = POLLIN;
00086
00087
00088
00089
00090
00091 if (active_connector_) {
00092 connect();
00093 } else {
00094 accept();
00095 }
00096
00097 while (true) {
00098 if (contact_broken_) {
00099 log_debug("contact_broken set, exiting main loop");
00100 return;
00101 }
00102
00103
00104
00105
00106
00107 if (cmdqueue_.size() != 0) {
00108 process_command();
00109 continue;
00110 }
00111
00112
00113
00114
00115
00116 bool more_to_send = send_pending_data();
00117
00118
00119
00120 if (contact_broken_) {
00121 log_debug("contact_broken set, exiting main loop");
00122 return;
00123 }
00124
00125
00126
00127
00128
00129 for (int i = 0; i < num_pollfds_ + 1; ++i) {
00130 pollfds_[i].revents = 0;
00131 }
00132
00133 int timeout = more_to_send ? 0 : poll_timeout_;
00134
00135 int cc = oasys::IO::poll_multiple(pollfds_, num_pollfds_ + 1,
00136 timeout, NULL, logpath_);
00137
00138 if (cc == oasys::IOTIMEOUT)
00139 {
00140 handle_poll_timeout();
00141 }
00142 else if (cc > 0)
00143 {
00144 if (cc == 1 && cmdqueue_poll->revents != 0) {
00145 continue;
00146 }
00147 handle_poll_activity();
00148 }
00149 else
00150 {
00151 log_err("unexpected return from poll_multiple: %d", cc);
00152 break_contact(ContactEvent::BROKEN);
00153 return;
00154 }
00155 }
00156 }
00157
00158
00159 void
00160 CLConnection::process_command()
00161 {
00162 CLMsg msg;
00163 bool ok = cmdqueue_.try_pop(&msg);
00164 ASSERT(ok);
00165
00166 switch(msg.type_) {
00167 case CLMSG_SEND_BUNDLE:
00168 log_debug("processing CLMSG_SEND_BUNDLE");
00169 handle_send_bundle(msg.bundle_.object());
00170 break;
00171
00172 case CLMSG_CANCEL_BUNDLE:
00173 log_debug("processing CLMSG_CANCEL_BUNDLE");
00174 handle_cancel_bundle(msg.bundle_.object());
00175 break;
00176
00177 case CLMSG_BREAK_CONTACT:
00178 log_debug("processing CLMSG_BREAK_CONTACT");
00179 break_contact(ContactEvent::USER);
00180 break;
00181 default:
00182 PANIC("invalid CLMsg typecode %d", msg.type_);
00183 }
00184
00185
00186
00187
00188
00189 if (contact_ != NULL && contact_->link()->state() == Link::BUSY)
00190 {
00191 if (cmdqueue_.size() == (params_->busy_queue_depth_ - 1)) {
00192 BundleDaemon::post(
00193 new LinkStateChangeRequest(contact_->link(),
00194 Link::AVAILABLE,
00195 ContactEvent::UNBLOCKED));
00196 }
00197 }
00198 }
00199
00200
00201 void
00202 CLConnection::contact_up()
00203 {
00204 log_debug("contact_up");
00205 ASSERT(contact_ != NULL);
00206
00207 ASSERT(!contact_up_);
00208 contact_up_ = true;
00209
00210 BundleDaemon::post(new ContactUpEvent(contact_));
00211 }
00212
00213
00214 void
00215 CLConnection::break_contact(ContactEvent::reason_t reason)
00216 {
00217 log_debug("break_contact: %s", ContactEvent::reason_to_str(reason));
00218
00219 if (reason != ContactEvent::BROKEN) {
00220 disconnect();
00221 }
00222
00223 contact_broken_ = true;
00224
00225
00226
00227
00228
00229
00230
00231
00232 if ((reason != ContactEvent::USER) && (contact_ != NULL)) {
00233 BundleDaemon::post(
00234 new LinkStateChangeRequest(contact_->link(),
00235 Link::CLOSED,
00236 reason));
00237 }
00238 }
00239
00240
00241 void
00242 CLConnection::close_contact()
00243 {
00244 LinkParams* params = dynamic_cast<LinkParams*>(contact_->link()->cl_info());
00245 ASSERT(params != NULL);
00246
00247
00248
00249 while (! inflight_.empty()) {
00250 InFlightBundle* inflight = inflight_.front();
00251
00252
00253 ASSERT(inflight->bundle_.object() != NULL);
00254 inflight->bundle_->payload_.close_file();
00255
00256 size_t sent_bytes = inflight->sent_data_.num_contiguous();
00257 size_t acked_bytes = inflight->ack_data_.num_contiguous();
00258
00259 if ((! params->reactive_frag_enabled_) ||
00260 (sent_bytes == 0) ||
00261 (contact_->link()->is_reliable() && acked_bytes == 0))
00262 {
00263 log_debug("posting transmission failed event "
00264 "(reactive fragmentation %s, %s link, acked_bytes %zu)",
00265 params->reactive_frag_enabled_ ? "enabled" : "disabled",
00266 contact_->link()->is_reliable() ? "reliable" : "unreliable",
00267 acked_bytes);
00268
00269 BundleDaemon::post(
00270 new BundleTransmitFailedEvent(inflight->bundle_.object(),
00271 contact_));
00272
00273 } else {
00274
00275
00276 sent_bytes -= inflight->header_block_length_;
00277 if (acked_bytes > inflight->header_block_length_) {
00278 acked_bytes -= inflight->header_block_length_;
00279 } else {
00280 acked_bytes = 0;
00281 }
00282
00283 BundleDaemon::post(
00284 new BundleTransmittedEvent(inflight->bundle_.object(),
00285 contact_,
00286 sent_bytes, acked_bytes));
00287 }
00288
00289 inflight_.pop_front();
00290 }
00291
00292
00293
00294
00295 if (! incoming_.empty()) {
00296 IncomingBundle* incoming = incoming_.back();
00297
00298 size_t rcvd_len = incoming->rcvd_data_.last() + 1;
00299
00300 if ((incoming->total_length_ == 0) &&
00301 params->reactive_frag_enabled_ &&
00302 (rcvd_len > incoming->header_block_length_))
00303 {
00304 log_debug("partial arrival of bundle: "
00305 "got %zu bytes [hdr %zu payload %zu]",
00306 rcvd_len, incoming->header_block_length_,
00307 incoming->bundle_->payload_.length());
00308
00309
00310
00311
00312 size_t payload_rcvd = rcvd_len - incoming->header_block_length_;
00313
00314
00315 ASSERT(incoming->bundle_.object() != NULL);
00316 incoming->bundle_->payload_.close_file();
00317
00318 BundleDaemon::post(
00319 new BundleReceivedEvent(incoming->bundle_.object(),
00320 EVENTSRC_PEER,
00321 payload_rcvd));
00322 }
00323 }
00324
00325
00326 while (!incoming_.empty()) {
00327 IncomingBundle* incoming = incoming_.back();
00328 incoming_.pop_back();
00329 delete incoming;
00330 }
00331
00332
00333
00334
00335 if (cmdqueue_.size() > 0) {
00336 log_warn("close_contact: %zu CL commands still in queue: ",
00337 cmdqueue_.size());
00338
00339 while (cmdqueue_.size() != 0) {
00340 CLMsg msg;
00341 bool ok = cmdqueue_.try_pop(&msg);
00342 ASSERT(ok);
00343
00344 log_warn("close_contact: %s still in queue", clmsg_to_str(msg.type_));
00345
00346 if (msg.type_ == CLMSG_SEND_BUNDLE) {
00347 BundleDaemon::post(
00348 new BundleTransmitFailedEvent(msg.bundle_.object(),
00349 contact_));
00350 }
00351 }
00352 }
00353 }
00354
00355
00356 void
00357 CLConnection::handle_announce_bundle(Bundle* announce)
00358 {
00359 log_debug("got announce bundle: source eid %s", announce->source_.c_str());
00360
00361
00362
00363
00364
00365
00366
00367
00368
00369
00370
00371
00372
00373 if (contact_ == NULL) {
00374
00375 ASSERT(nexthop_ != "");
00376
00377
00378 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00379
00380 Link* link = cm->find_link_to(cl_, "", announce->source_,
00381 Link::OPPORTUNISTIC,
00382 Link::AVAILABLE | Link::UNAVAILABLE);
00383
00384 if (link != NULL) {
00385 link->set_nexthop(nexthop_);
00386 log_debug("found idle opportunistic link *%p", link);
00387
00388 } else {
00389 link = cm->new_opportunistic_link(cl_,
00390 nexthop_.c_str(),
00391 announce->source_);
00392 log_debug("created new opportunistic link *%p", link);
00393 }
00394
00395 ASSERT(! link->isopen());
00396
00397 contact_ = new Contact(link);
00398 contact_->set_cl_info(this);
00399 link->set_contact(contact_.object());
00400
00401
00402
00403
00404
00405
00406 LinkParams* lparams = dynamic_cast<LinkParams*>(link->cl_info());
00407 ASSERT(lparams != NULL);
00408 params_ = lparams;
00409 }
00410 }
00411
00412
00413 }