00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifdef HAVE_CONFIG_H
00018 # include <dtn-config.h>
00019 #endif
00020
00021 #include <oasys/util/OptParser.h>
00022 #include <oasys/util/Time.h>
00023
00024 #include "CLConnection.h"
00025 #include "bundling/BundleDaemon.h"
00026 #include "bundling/BundlePayload.h"
00027 #include "contacts/ContactManager.h"
00028
00029 namespace dtn {
00030
00031
00032 CLConnection::CLConnection(const char* classname,
00033 const char* logpath,
00034 ConnectionConvergenceLayer* cl,
00035 LinkParams* params,
00036 bool active_connector)
00037 : Thread(classname),
00038 Logger(classname, logpath),
00039 contact_(classname),
00040 contact_up_(false),
00041 cmdqueue_lock_(),
00042 cmdqueue_(logpath, &cmdqueue_lock_, false),
00043 cl_(cl),
00044 params_(params),
00045 active_connector_(active_connector),
00046 num_pollfds_(0),
00047 poll_timeout_(-1),
00048 contact_broken_(false)
00049 {
00050 sendbuf_.reserve(params_->sendbuf_len_);
00051 recvbuf_.reserve(params_->recvbuf_len_);
00052 }
00053
00054
00055 CLConnection::~CLConnection()
00056 {
00057 }
00058
00059
00060 void
00061 CLConnection::run()
00062 {
00063 struct pollfd* cmdqueue_poll;
00064
00065 initialize_pollfds();
00066 if (contact_broken_) {
00067 log_debug("contact_broken set during initialization");
00068 return;
00069 }
00070
00071 cmdqueue_poll = &pollfds_[num_pollfds_];
00072 cmdqueue_poll->fd = cmdqueue_.read_fd();
00073 cmdqueue_poll->events = POLLIN;
00074
00075
00076
00077
00078
00079 if (active_connector_) {
00080 connect();
00081 } else {
00082 accept();
00083 }
00084
00085 oasys::Time next_write(0,0);
00086
00087 while (true) {
00088 if (contact_broken_) {
00089 log_debug("contact_broken set, exiting main loop");
00090 return;
00091 }
00092
00093
00094
00095
00096
00097 if (cmdqueue_.size() != 0) {
00098 process_command();
00099 continue;
00100 }
00101
00102 oasys::Time now = oasys::Time::now();
00103
00104 int timeout;
00105 if (params_->test_write_delay_ == 0)
00106 {
00107
00108
00109
00110
00111 bool more_to_send = send_pending_data();
00112 timeout = more_to_send ? 0 : poll_timeout_;
00113 }
00114 else
00115 {
00116
00117
00118 if (now >= next_write) {
00119 bool more_to_send = send_pending_data();
00120 if (more_to_send) {
00121 next_write = now;
00122 next_write.add_milliseconds(params_->test_write_delay_);
00123 } else {
00124 next_write.sec_ = 0;
00125 next_write.usec_ = 0;
00126 }
00127 }
00128
00129
00130 if (next_write.sec_ != 0) {
00131 timeout = std::min((u_int32_t)poll_timeout_,
00132 (next_write - now).in_milliseconds());
00133 } else {
00134 timeout = poll_timeout_;
00135 }
00136
00137 log_debug("timeout is %u: next_write %u.%u (%u ms from now), poll_timeout %d",
00138 timeout, next_write.sec_, next_write.usec_,
00139 next_write.sec_ == 0 ? 0 : (next_write - now).in_milliseconds(), poll_timeout_);
00140
00141 }
00142
00143
00144
00145 if (contact_broken_) {
00146 log_debug("contact_broken set, exiting main loop");
00147 return;
00148 }
00149
00150
00151
00152
00153
00154 for (int i = 0; i < num_pollfds_ + 1; ++i) {
00155 pollfds_[i].revents = 0;
00156 }
00157
00158 log_debug("calling poll on %d fds with timeout %d",
00159 num_pollfds_ + 1, timeout);
00160
00161 int cc = oasys::IO::poll_multiple(pollfds_, num_pollfds_ + 1,
00162 timeout, NULL, logpath_);
00163
00164
00165
00166 if (contact_broken_) {
00167 log_debug("contact_broken set, exiting main loop");
00168 return;
00169 }
00170
00171 if (cc == oasys::IOTIMEOUT)
00172 {
00173 handle_poll_timeout();
00174 }
00175 else if (cc > 0)
00176 {
00177 if (cc == 1 && cmdqueue_poll->revents != 0) {
00178 continue;
00179 }
00180 handle_poll_activity();
00181 }
00182 else
00183 {
00184 log_err("unexpected return from poll_multiple: %d", cc);
00185 break_contact(ContactEvent::BROKEN);
00186 return;
00187 }
00188 }
00189 }
00190
00191
00192 void
00193 CLConnection::process_command()
00194 {
00195 CLMsg msg;
00196 bool ok = cmdqueue_.try_pop(&msg);
00197 ASSERT(ok);
00198
00199 switch(msg.type_) {
00200 case CLMSG_BUNDLES_QUEUED:
00201 log_debug("processing CLMSG_BUNDLES_QUEUED");
00202 handle_bundles_queued();
00203 break;
00204
00205 case CLMSG_CANCEL_BUNDLE:
00206 log_debug("processing CLMSG_CANCEL_BUNDLE");
00207 handle_cancel_bundle(msg.bundle_.object());
00208 break;
00209
00210 case CLMSG_BREAK_CONTACT:
00211 log_debug("processing CLMSG_BREAK_CONTACT");
00212 break_contact(ContactEvent::USER);
00213 break;
00214 default:
00215 PANIC("invalid CLMsg typecode %d", msg.type_);
00216 }
00217 }
00218
00219
00220 void
00221 CLConnection::contact_up()
00222 {
00223 log_debug("contact_up");
00224 ASSERT(contact_ != NULL);
00225
00226 ASSERT(!contact_up_);
00227 contact_up_ = true;
00228
00229 BundleDaemon::post(new ContactUpEvent(contact_));
00230 }
00231
00232
00233 void
00234 CLConnection::break_contact(ContactEvent::reason_t reason)
00235 {
00236 contact_broken_ = true;
00237
00238 log_debug("break_contact: %s", ContactEvent::reason_to_str(reason));
00239
00240 if (reason != ContactEvent::BROKEN) {
00241 disconnect();
00242 }
00243
00244
00245
00246
00247
00248
00249
00250
00251 if ((reason != ContactEvent::USER) && (contact_ != NULL)) {
00252 BundleDaemon::post(
00253 new LinkStateChangeRequest(contact_->link(),
00254 Link::CLOSED,
00255 reason));
00256 }
00257 }
00258
00259
00260 bool
00261 CLConnection::find_contact(const EndpointID& peer_eid)
00262 {
00263 if (contact_ != NULL) {
00264 log_debug("CLConnection::find_contact: contact already exists");
00265 return true;
00266 }
00267
00268
00269
00270
00271
00272
00273
00274
00275
00276
00277
00278
00279
00280 ASSERT(nexthop_ != "");
00281
00282
00283 ContactManager* cm = BundleDaemon::instance()->contactmgr();
00284 oasys::ScopeLock l(cm->lock(), "CLConnection::find_contact");
00285
00286 bool new_link = false;
00287 LinkRef link = cm->find_link_to(cl_, "", peer_eid,
00288 Link::OPPORTUNISTIC,
00289 Link::AVAILABLE | Link::UNAVAILABLE);
00290
00291 if (link == NULL || (link != NULL && link->contact() != NULL)) {
00292 if (link != NULL) {
00293 log_warn("CLConnection::find_contact: "
00294 "in-use opportunistic link *%p", link.object());
00295 }
00296
00297 link = cm->new_opportunistic_link(cl_, nexthop_.c_str(), peer_eid);
00298 if (link == NULL) {
00299 log_debug("CLConnection::find_contact: "
00300 "failed to create opportunistic link");
00301 return false;
00302 }
00303
00304 new_link = true;
00305 log_debug("CLConnection::find_contact: "
00306 "created new opportunistic link *%p", link.object());
00307 }
00308
00309 ASSERT(link != NULL);
00310 oasys::ScopeLock link_lock(link->lock(), "CLConnection::find_contact");
00311
00312
00313 if (!new_link) {
00314 ASSERT(link->contact() == NULL);
00315 link->set_nexthop(nexthop_);
00316 log_debug("CLConnection::find_contact: "
00317 "found idle opportunistic link *%p", link.object());
00318 }
00319
00320
00321
00322 ASSERT(!link->isdeleted());
00323
00324 ASSERT(link->cl_info() != NULL);
00325 ASSERT(!link->isopen());
00326
00327 contact_ = new Contact(link);
00328 contact_->set_cl_info(this);
00329 link->set_contact(contact_.object());
00330
00331
00332
00333
00334
00335
00336 LinkParams* lparams = dynamic_cast<LinkParams*>(link->cl_info());
00337 ASSERT(lparams != NULL);
00338 params_ = lparams;
00339
00340 return true;
00341 }
00342
00343
00344 }