00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <oasys/util/OptParser.h>
00040
00041 #include "ConnectionConvergenceLayer.h"
00042 #include "CLConnection.h"
00043 #include "bundling/BundleDaemon.h"
00044
00045 namespace dtn {
00046
00047
00048 ConnectionConvergenceLayer::LinkParams::LinkParams(bool init_defaults)
00049 : busy_queue_depth_(10),
00050 reactive_frag_enabled_(true),
00051 sendbuf_len_(32768),
00052 recvbuf_len_(32768),
00053 data_timeout_(30000),
00054 test_read_delay_(0),
00055 test_write_delay_(0),
00056 test_recv_delay_(0)
00057 {
00058 (void)init_defaults;
00059 }
00060
00061
00062 ConnectionConvergenceLayer::ConnectionConvergenceLayer(const char* classname,
00063 const char* cl_name)
00064 : ConvergenceLayer(classname, cl_name)
00065 {
00066 }
00067
00068
00069 bool
00070 ConnectionConvergenceLayer::parse_link_params(LinkParams* params,
00071 int argc, const char** argv,
00072 const char** invalidp)
00073 {
00074 oasys::OptParser p;
00075
00076 p.addopt(new oasys::UIntOpt("busy_queue_depth",
00077 ¶ms->busy_queue_depth_));
00078 p.addopt(new oasys::BoolOpt("reactive_frag_enabled",
00079 ¶ms->reactive_frag_enabled_));
00080 p.addopt(new oasys::UIntOpt("sendbuf_len", ¶ms->sendbuf_len_));
00081 p.addopt(new oasys::UIntOpt("recvbuf_len", ¶ms->recvbuf_len_));
00082 p.addopt(new oasys::UIntOpt("data_timeout", ¶ms->data_timeout_));
00083
00084 p.addopt(new oasys::UIntOpt("test_read_delay",
00085 ¶ms->test_read_delay_));
00086 p.addopt(new oasys::UIntOpt("test_write_delay",
00087 ¶ms->test_write_delay_));
00088 p.addopt(new oasys::UIntOpt("test_recv_delay",
00089 ¶ms->test_recv_delay_));
00090
00091 if (! p.parse(argc, argv, invalidp)) {
00092 return false;
00093 }
00094
00095 if (params->sendbuf_len_ == 0) {
00096 *invalidp = "sendbuf_len must not be zero";
00097 return false;
00098 }
00099
00100 if (params->recvbuf_len_ == 0) {
00101 *invalidp = "recvbuf_len must not be zero";
00102 return false;
00103 }
00104
00105 return true;
00106 }
00107
00108
00109 void
00110 ConnectionConvergenceLayer::dump_link(Link* link, oasys::StringBuffer* buf)
00111 {
00112 LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00113 ASSERT(params != NULL);
00114
00115 buf->appendf("busy_queue_depth: %u\n", params->busy_queue_depth_);
00116 buf->appendf("reactive_frag_enabled: %u\n", params->reactive_frag_enabled_);
00117 buf->appendf("sendbuf_len: %u\n", params->sendbuf_len_);
00118 buf->appendf("recvbuf_len: %u\n", params->recvbuf_len_);
00119 buf->appendf("data_timeout: %u\n", params->data_timeout_);
00120 buf->appendf("test_read_delay: %u\n", params->test_read_delay_);
00121 buf->appendf("test_write_delay: %u\n", params->test_write_delay_);
00122 buf->appendf("test_recv_delay: %u\n",params->test_recv_delay_);
00123 }
00124
00125
00126 bool
00127 ConnectionConvergenceLayer::init_link(Link* link, int argc, const char* argv[])
00128 {
00129 log_debug("adding %s link %s", link->type_str(), link->nexthop());
00130
00131
00132
00133 LinkParams* params = new_link_params();
00134
00135 if (! parse_nexthop(link, params)) {
00136 log_err("error parsing link nexthop '%s'", link->nexthop());
00137 delete params;
00138 return false;
00139 }
00140
00141 const char* invalid;
00142 if (! parse_link_params(params, argc, argv, &invalid)) {
00143 log_err("error parsing link options: invalid option '%s'", invalid);
00144 delete params;
00145 return false;
00146 }
00147
00148 if (! finish_init_link(link, params)) {
00149 log_err("error in finish_init_link");
00150 delete params;
00151 return false;
00152 }
00153
00154 link->set_cl_info(params);
00155
00156 return true;
00157 }
00158
00159
00160 bool
00161 ConnectionConvergenceLayer::finish_init_link(Link* link, LinkParams* params)
00162 {
00163 (void)link;
00164 (void)params;
00165 return true;
00166 }
00167
00168
00169 bool
00170 ConnectionConvergenceLayer::reconfigure_link(Link* link,
00171 int argc, const char* argv[])
00172 {
00173 LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00174 ASSERT(params != NULL);
00175
00176 const char* invalid;
00177 if (! parse_link_params(params, argc, argv, &invalid)) {
00178 log_err("reconfigure_link: invalid parameter %s", invalid);
00179 return false;
00180 }
00181
00182 if (link->isopen()) {
00183 LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00184 ASSERT(params != NULL);
00185
00186 CLConnection* conn = dynamic_cast<CLConnection*>(link->contact()->cl_info());
00187 ASSERT(conn != NULL);
00188
00189 if ((params->sendbuf_len_ != conn->sendbuf_.size()) &&
00190 (params->sendbuf_len_ >= conn->sendbuf_.fullbytes()))
00191 {
00192 log_info("resizing link *%p send buffer from %zu -> %u",
00193 link, conn->sendbuf_.size(), params->sendbuf_len_);
00194 conn->sendbuf_.set_size(params->sendbuf_len_);
00195 }
00196
00197 if ((params->recvbuf_len_ != conn->recvbuf_.size()) &&
00198 (params->recvbuf_len_ >= conn->recvbuf_.fullbytes()))
00199 {
00200 log_info("resizing link *%p recv buffer from %zu -> %u",
00201 link, conn->recvbuf_.size(), params->recvbuf_len_);
00202 conn->recvbuf_.set_size(params->recvbuf_len_);
00203 }
00204 }
00205
00206 return true;
00207 }
00208
00209
00210 bool
00211 ConnectionConvergenceLayer::open_contact(const ContactRef& contact)
00212 {
00213 Link* link = contact->link();
00214 log_debug("opening contact on link *%p", link);
00215
00216 LinkParams* params = dynamic_cast<LinkParams*>(link->cl_info());
00217 ASSERT(params != NULL);
00218
00219
00220
00221 CLConnection* conn = new_connection(params);
00222 conn->set_contact(contact);
00223 contact->set_cl_info(conn);
00224 conn->start();
00225
00226 return true;
00227 }
00228
00229
00230 bool
00231 ConnectionConvergenceLayer::close_contact(const ContactRef& contact)
00232 {
00233 log_info("close_contact *%p", contact.object());
00234
00235 CLConnection* conn = dynamic_cast<CLConnection*>(contact->cl_info());
00236 ASSERT(conn != NULL);
00237
00238
00239
00240 if (! conn->contact_broken_) {
00241 conn->cmdqueue_.push_back(
00242 CLConnection::CLMsg(CLConnection::CLMSG_BREAK_CONTACT));
00243 }
00244
00245 while (!conn->is_stopped()) {
00246 log_debug("waiting for connection thread to stop...");
00247 usleep(100000);
00248 oasys::Thread::yield();
00249 }
00250
00251 conn->close_contact();
00252 delete conn;
00253
00254 contact->set_cl_info(NULL);
00255
00256 return true;
00257 }
00258
00259
00260 void
00261 ConnectionConvergenceLayer::send_bundle(const ContactRef& contact,
00262 Bundle* bundle)
00263 {
00264 log_debug("send_bundle *%p to *%p", bundle, contact.object());
00265
00266 CLConnection* conn = dynamic_cast<CLConnection*>(contact->cl_info());
00267 ASSERT(conn != NULL);
00268
00269 LinkParams* params = dynamic_cast<LinkParams*>(contact->link()->cl_info());
00270 ASSERT(params != NULL);
00271
00272 conn->cmdqueue_.push_back(
00273 CLConnection::CLMsg(CLConnection::CLMSG_SEND_BUNDLE, bundle));
00274
00275
00276
00277 if (conn->cmdqueue_.size() >= params->busy_queue_depth_)
00278 {
00279 contact->link()->set_state(Link::BUSY);
00280 }
00281 }
00282
00283 }