00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019 #include <unistd.h>
00020 #include <errno.h>
00021
00022
00023
00024
00025
00026 #include "dtn_api.h"
00027 #include "TcaController.h"
00028
00029
00030 static const int debug = 1;
00031
00032
00033
00034
00035 static const int RECV_TIMEOUT = 30000;
00036
00037
00038
00039
00040
00041
00042
00043
00044 static const u_int32_t REG_EXPIRATION_TIME = 2000000;
00045
00046
00047
00048
00049
00050 bool
00051 make_spec(dtn_bundle_spec_t& spec,
00052 std::string source,
00053 std::string dest,
00054 std::string replyto,
00055 int expiration,
00056 dtn_bundle_priority_t priority = COS_NORMAL,
00057 dtn_bundle_delivery_opts_t dopts = DOPTS_NONE
00058 )
00059 {
00060 memset(&spec, 0, sizeof(spec));
00061
00062 if (dtn_parse_eid_string(&spec.source, source.c_str()))
00063 {
00064 fprintf(stderr, "make_spec: invalid source eid '%s'\n",
00065 source.c_str());
00066 return false;
00067 }
00068
00069 if (dtn_parse_eid_string(&spec.dest, dest.c_str()))
00070 {
00071 fprintf(stderr, "make_spec: invalid dest eid '%s'\n",
00072 dest.c_str());
00073 return false;
00074 }
00075
00076 if (dtn_parse_eid_string(&spec.replyto, replyto.c_str()))
00077 {
00078 fprintf(stderr, "make_spec: invalid replyto eid '%s'\n",
00079 replyto.c_str());
00080 return false;
00081 }
00082
00083 spec.priority = priority;
00084 spec.dopts = dopts;
00085 spec.expiration = expiration;
00086
00087 return true;
00088 }
00089
00090
00091 static bool
00092 check_nargs(const dtn::TcaControlBundle& cb, uint n_expected)
00093 {
00094 if (cb.args_.size() != n_expected)
00095 {
00096 printf("TcaController: bundle '%s' contains wrong number of args. "
00097 "%d expected.\n", cb.str().c_str(), n_expected);
00098 return false;
00099 }
00100 return true;
00101 }
00102
00103
00104 TcaController::TcaController(TcaController::Role role,
00105 const std::string& link_id,
00106 const std::string& ask_addr,
00107 const std::string& adv_str,
00108 int registry_ttl, int control_ttl)
00109 : role_(role), link_id_(link_id),
00110 ask_addr_(ask_addr), adv_str_(adv_str),
00111 registry_ttl_(registry_ttl), control_ttl_(control_ttl)
00112
00113 {
00114
00115 }
00116
00117
00118 TcaController::~TcaController()
00119 {
00120 dtn_close(handle_);
00121 }
00122
00123
00124 bool
00125 TcaController::dtn_reg(dtn_endpoint_id_t& eid, dtn_reg_id_t& id)
00126 {
00127
00128
00129
00130 dtn_reg_info_t reginfo;
00131
00132 int ret;
00133
00134 memset(®info, 0, sizeof(reginfo));
00135 dtn_copy_eid(®info.endpoint, &eid);
00136 reginfo.failure_action = DTN_REG_DEFER;
00137 reginfo.regid = DTN_REGID_NONE;
00138 reginfo.expiration = REG_EXPIRATION_TIME;
00139 if ((ret = dtn_register(handle_, ®info, &id)) != 0) {
00140 fprintf(stderr, "error creating registration: %d (%s)\n",
00141 ret, dtn_strerror(dtn_errno(handle_)));
00142 return false;
00143 }
00144
00145
00146 printf("TcaController::dtn_reg: app registered as %s, id=0x%x\n",
00147 eid.uri, id);
00148
00149 return true;
00150 }
00151
00152
00153 bool
00154 TcaController::init(bool tidy)
00155 {
00156
00157 int err = dtn_open(&handle_);
00158 if (err != DTN_SUCCESS) {
00159 fprintf(stderr, "fatal error opening dtn handle: %s\n",
00160 dtn_strerror(err));
00161 return false;
00162 }
00163
00164 printf("TcaController::init: dtn_open succeeded\n");
00165
00166
00167 dtn_reg_id_t app_id;
00168 dtn_build_local_eid(handle_, &local_eid_, "/admin");
00169 if (!dtn_reg(local_eid_, app_id)) return false;
00170
00171
00172
00173
00174
00175
00176
00177
00178
00179
00180 if (tidy) eat_bundles();
00181
00182 if (role_ == TCA_GATEWAY)
00183 {
00184
00185 registry_.init_nodes();
00186 if (!registry_.init_addrs())
00187 {
00188
00189
00190
00191 fprintf(stderr,
00192 "TcaController fatal error: no registry nodes available!\n");
00193 return false;
00194 }
00195 }
00196
00197 return true;
00198 }
00199
00200
00201 void
00202 TcaController::run()
00203 {
00204
00205 if (ask_addr_.length() != 0)
00206 {
00207
00208 ask(ask_addr_);
00209 }
00210
00211 dtn_bundle_spec_t spec;
00212 std::string payload;
00213
00214 for (;;)
00215 {
00216 if (recv_bundle(spec, payload, RECV_TIMEOUT))
00217 {
00218 handle_bundle_received(spec, payload);
00219 }
00220 }
00221 }
00222
00223
00224 bool
00225 TcaController::handle_bundle_received(const dtn_bundle_spec_t& spec,
00226 const std::string& payload)
00227 {
00228 dtn::TcaControlBundle cb(payload);
00229
00230
00231 switch (cb.type_)
00232 {
00233 case dtn::TcaControlBundle::CB_LINK_ANNOUNCE:
00234 handle_link_announce(spec, cb);
00235 break;
00236 case dtn::TcaControlBundle::CB_ASK:
00237 handle_ask(spec, cb);
00238 break;
00239 case dtn::TcaControlBundle::CB_ASK_RECEIVED:
00240 handle_ask_received(spec, cb);
00241 break;
00242 case dtn::TcaControlBundle::CB_ASK_SENT:
00243 handle_ask_sent(spec, cb);
00244 break;
00245 case dtn::TcaControlBundle::CB_ADV:
00246 handle_adv(spec, cb);
00247 break;
00248 case dtn::TcaControlBundle::CB_ADV_SENT:
00249 handle_adv_sent(spec, cb);
00250 break;
00251 case dtn::TcaControlBundle::CB_REG_RECEIVED:
00252 handle_reg_received(spec, cb);
00253 break;
00254 case dtn::TcaControlBundle::CB_UNB:
00255 handle_unb(spec, cb);
00256 break;
00257 case dtn::TcaControlBundle::CB_COA:
00258
00259
00260 break;
00261 case dtn::TcaControlBundle::CB_COA_SENT:
00262 handle_coa_sent(spec, cb);
00263 break;
00264 case dtn::TcaControlBundle::CB_ROUTES:
00265 handle_routes(spec, cb);
00266 break;
00267 case dtn::TcaControlBundle::CB_LINK_AVAILABLE:
00268 break;
00269 case dtn::TcaControlBundle::CB_LINK_UNAVAILABLE:
00270 break;
00271 case dtn::TcaControlBundle::CB_CONTACT_UP:
00272 break;
00273 case dtn::TcaControlBundle::CB_CONTACT_DOWN:
00274 break;
00275 default:
00276 printf("TcaController: unrecognized bundle code received: '%s'\n",
00277 cb.code_.c_str());
00278 }
00279
00280 return true;
00281 }
00282
00283
00284
00285 bool
00286 TcaController::handle_reg_received(const dtn_bundle_spec_t& spec,
00287 const dtn::TcaControlBundle& cb)
00288 {
00289 switch (role_)
00290 {
00291 case TCA_MOBILE:
00292
00293
00294
00295 return route_reg(spec, cb);
00296 break;
00297 case TCA_ROUTER:
00298 return route_reg(spec, cb);
00299 break;
00300 case TCA_GATEWAY:
00301 return gate_reg(spec, cb);
00302 break;
00303 }
00304
00305 return false;
00306 }
00307
00308
00309 bool
00310 TcaController::handle_unb(const dtn_bundle_spec_t& spec,
00311 const dtn::TcaControlBundle& cb)
00312 {
00313 (void)spec;
00314
00315 if (role_ != TCA_GATEWAY)
00316 {
00317 printf("TcaController error: non-gateway received unb bundle.\n");
00318 return false;
00319 }
00320
00321 TcaEndpointID dest_eid(cb.args_[0]);
00322 RegRecord rr;
00323
00324 if (!get_registration(dest_eid, rr))
00325 {
00326 printf("TcaController: bind failed: unregistered node [%s]\n",
00327 dest_eid.c_str());
00328
00329
00330
00331
00332 return false;
00333 }
00334
00335
00336
00337
00338
00339 if (!add_route(dest_eid.str(), rr.link_addr_)) return false;
00340
00341 return true;
00342 }
00343
00344
00345 bool
00346 TcaController::handle_coa_sent(const dtn_bundle_spec_t& spec,
00347 const dtn::TcaControlBundle& cb)
00348 {
00349 (void)spec;
00350
00351 if (role_ == TCA_MOBILE)
00352 {
00353 printf("TcaController error: mobile received coa_sent bundle.\n");
00354 return false;
00355 }
00356
00357
00358 const std::string& src = cb.args_[0];
00359 const std::string& dest = cb.args_[1];
00360 const std::string& link = cb.args_[2];
00361
00362
00363 TcaEndpointID pattern = dest;
00364 pattern.set_app("*");
00365 del_route(pattern.str());
00366
00367
00368
00369
00370 if (src == local_eid_.uri)
00371 {
00372 TcaEndpointID tca_dest = dest;
00373 if (!do_registration(tca_dest, link_id_)) return false;
00374
00375 tca_dest.set_app("*");
00376 if (!add_route(tca_dest.str(), link)) return false;
00377 }
00378
00379 return true;
00380 }
00381
00382
00383 bool
00384 TcaController::handle_link_announce(const dtn_bundle_spec_t& spec,
00385 const dtn::TcaControlBundle& cb)
00386 {
00387 (void)spec;
00388
00389
00390
00391 if (!check_nargs(cb, 1)) return false;
00392 std::string link_spec = cb.args_[0];
00393 link_spec += ":5000";
00394 return ask(link_spec);
00395 }
00396
00397
00398 bool
00399 TcaController::handle_ask(const dtn_bundle_spec_t& spec,
00400 const dtn::TcaControlBundle& cb)
00401 {
00402 (void)spec;
00403 (void)cb;
00404 printf("error -- we should never receive an ask bundle!\n");
00405
00406 return true;
00407 }
00408
00409
00410 bool
00411 TcaController::handle_ask_received(const dtn_bundle_spec_t& spec,
00412 const dtn::TcaControlBundle& cb)
00413 {
00414 (void)spec;
00415 dtn::TcaWrappedBundle wb(cb);
00416
00417
00418
00419 TcaEndpointID src_eid = wb.source();
00420 src_eid.set_app("*");
00421
00422 if (!add_route(src_eid.str(), wb.args_[2])) return false;
00423
00424
00425
00426
00427 std::string response = "adv:";
00428 response += wb.dest();
00429 response += "\t";
00430 response += adv_str_;
00431 send_bundle(wb.source(), response);
00432
00433 return true;
00434 }
00435
00436
00437 bool
00438 TcaController::handle_ask_sent(const dtn_bundle_spec_t& spec,
00439 const dtn::TcaControlBundle& cb)
00440 {
00441 (void)spec;
00442 dtn::TcaWrappedBundle wb(cb);
00443
00444
00445 TcaEndpointID dest_eid = wb.dest();
00446 dest_eid.set_app("*");
00447
00448 if (!del_route(dest_eid.str())) return false;
00449 return true;
00450 }
00451
00452
00453 bool
00454 TcaController::handle_adv(const dtn_bundle_spec_t& spec,
00455 const dtn::TcaControlBundle& cb)
00456 {
00457 (void)spec;
00458 (void)cb;
00459
00460
00461
00462 return true;
00463 }
00464
00465
00466 bool
00467 TcaController::handle_adv_sent(const dtn_bundle_spec_t& spec,
00468 const dtn::TcaControlBundle& cb)
00469 {
00470 (void)spec;
00471 dtn::TcaWrappedBundle wb(cb);
00472
00473
00474
00475
00476
00477
00478
00479 TcaEndpointID dest_eid = wb.dest();
00480 dest_eid.set_app("*");
00481
00482 if (!del_route(dest_eid.str())) return false;
00483 return true;
00484 }
00485
00486
00487 bool
00488 TcaController::handle_routes(const dtn_bundle_spec_t& spec,
00489 const dtn::TcaControlBundle& cb)
00490 {
00491 (void)spec;
00492
00493 printf("routes:\n");
00494 for (unsigned int i=0; i<cb.args_.size(); ++i)
00495 {
00496 printf(" %s \n", cb.args_[i].c_str());
00497 }
00498 return true;
00499 }
00500
00501
00502 bool
00503 TcaController::route_reg(const dtn_bundle_spec_t& spec,
00504 const dtn::TcaControlBundle& cb)
00505 {
00506 (void)spec;
00507
00508
00509
00510
00511
00512 if (!check_nargs(cb, 4)) return false;
00513
00514 dtn::TcaWrappedBundle wb(cb);
00515
00516 std::string mobile_eid = wb.args_[2];
00517 std::string last_hop = wb.args_[3];
00518
00519 if (last_hop != "NULL")
00520 {
00521
00522
00523
00524 mobile_eid = wb.args_[2];
00525 last_hop = wb.args_[3];
00526
00527 TcaEndpointID pattern(mobile_eid);
00528
00529
00530
00531
00532
00533 pattern.set_app("*");
00534 printf("deleteing routes for %s\n", pattern.str().c_str());
00535 if (!del_route(pattern.str())) return false;
00536
00537
00538 if (!add_route(pattern.str(), last_hop)) return false;
00539 }
00540
00541
00542
00543
00544
00545 if (role_ != TCA_GATEWAY)
00546 {
00547 std::string new_payload = "register:";
00548 new_payload += mobile_eid;
00549 new_payload += "\t";
00550 new_payload += link_id_;
00551
00552 if (!send_bundle("tca://registry", new_payload)) return false;
00553 }
00554
00555 return true;
00556 }
00557
00558
00559 bool
00560 TcaController::gate_reg(const dtn_bundle_spec_t& spec,
00561 const dtn::TcaControlBundle& cb)
00562 {
00563 (void)spec;
00564 dtn::TcaWrappedBundle wb(cb);
00565
00566 std::string mobile_eid = wb.args_[2];
00567 std::string last_hop = wb.args_[3];
00568
00569 TcaEndpointID src(mobile_eid);
00570
00571
00572
00573
00574 RegRecord old_reg;
00575 bool prev_reg_exists = get_registration(src, old_reg);
00576
00577 if (prev_reg_exists)
00578 {
00579
00580 if (old_reg.link_addr_ == link_id_)
00581 {
00582 printf("TcaController: ignoring re-registration with same"
00583 " gateway\n");
00584
00585
00586
00587
00588
00589
00590
00591
00592 return true;
00593 }
00594 else
00595 {
00596 printf("TcaController: initiating change-of-address\n");
00597
00598
00599
00600
00601
00602
00603
00604
00605
00606
00607
00608 TcaEndpointID pattern = src;
00609 pattern.set_app("admin.coa");
00610
00611 printf("TcaController: adding route %s -> %s\n",
00612 pattern.c_str(), old_reg.link_addr_.c_str());
00613
00614 if (!add_route(pattern.str(), old_reg.link_addr_)) return false;
00615
00616
00617 TcaEndpointID dest_eid = src;
00618 dest_eid.set_app("admin.coa");
00619
00620
00621
00622
00623
00624
00625
00626
00627
00628
00629
00630
00631
00632 std::string coa_payload = "coa:";
00633 coa_payload += last_hop;
00634
00635 dtn_bundle_spec_t coa_spec;
00636 if (!make_spec(coa_spec, local_eid_.uri, dest_eid.str(),
00637 local_eid_.uri, control_ttl_, COS_NORMAL, DOPTS_NONE))
00638 return false;
00639
00640
00641
00642
00643
00644 if (!send_bundle(coa_spec, coa_payload)) return false;
00645
00646
00647
00648
00649
00650 }
00651
00652
00653
00654
00655
00656 }
00657
00658 else
00659 {
00660
00661
00662
00663
00664 if (!do_registration(src, link_id_)) return false;
00665
00666
00667 if (!route_reg(spec, cb)) return false;
00668 }
00669
00670 return true;
00671
00672 }
00673
00674
00675 bool
00676 TcaController::ask(const std::string& link)
00677 {
00678
00679
00680
00681
00682
00683 std::string host = "tca://anonymous";
00684
00685 host += link.substr(6, link.length());
00686
00687 std::string pattern = host + "/*";
00688
00689 if (!add_route(pattern, link)) return false;
00690
00691 std::string payload = "ask:";
00692 payload += link_id_;
00693
00694 std::string dest = host + "/admin.ask";
00695
00696 if (!send_bundle(dest, payload))
00697 {
00698 fprintf(stderr,
00699 "TcaController::ask: error: failed to send ask bundle\n");
00700 return false;
00701 }
00702
00703 return true;
00704 }
00705
00706
00707 bool
00708 TcaController::get_routes()
00709 {
00710 if (!send_bundle("tca://localhost/bundlelayer", "get_routes:tca://*"))
00711 {
00712 fprintf(stderr, "error: failed to send get_routes bundle\n");
00713 return false;
00714 }
00715
00716 return true;
00717 }
00718
00719
00720 bool
00721 TcaController::add_route(const std::string& route_pattern,
00722 const std::string& link)
00723 {
00724 std::string body = "add_route:";
00725 body += route_pattern;
00726 body += "\t";
00727 body += link;
00728
00729
00730 if (!send_bundle("tca://localhost/bundlelayer", body))
00731 {
00732 fprintf(stderr, "error: failed to send add_route bundle\n");
00733 return false;
00734 }
00735
00736 return true;
00737 }
00738
00739
00740 bool
00741 TcaController::del_route(const std::string& route_pattern)
00742 {
00743 std::string body = "del_route:";
00744 body += route_pattern;
00745
00746 if (!send_bundle("tca://localhost/bundlelayer", body))
00747 {
00748 fprintf(stderr, "error: failed to send del_route bundle\n");
00749 return false;
00750 }
00751
00752 return true;
00753 }
00754
00755
00756 bool
00757 TcaController::get_registration(const TcaEndpointID& eid, RegRecord& rr)
00758 {
00759 rr.host_ = eid.get_hostid();
00760 if (registry_.read(rr))
00761 {
00762 printf("TcaController: found registry entry %s\n", rr.str().c_str());
00763 return true;
00764 }
00765 else
00766 {
00767 printf("TcaController: no registry entry for host %s\n",
00768 rr.host_.c_str());
00769 return false;
00770 }
00771 }
00772
00773
00774
00775 bool
00776 TcaController::do_registration(const TcaEndpointID& eid,
00777 const std::string& link_addr)
00778 {
00779 RegRecord rr(eid.get_hostid(), link_addr);
00780
00781 if (registry_.write(rr, registry_ttl_))
00782 {
00783 printf("TcaController: wrote registry entry %s\n", rr.str().c_str());
00784 return true;
00785 }
00786 else
00787 {
00788 printf("TcaController: failed to write registry entry %s\n",
00789 rr.str().c_str());
00790 return false;
00791 }
00792
00793 }
00794
00795
00796 bool
00797 TcaController::test_all()
00798 {
00799 get_routes();
00800
00801 add_route("tca://booyah", "tcp://9.8.7.6:54321");
00802 sleep(5);
00803
00804 get_routes();
00805
00806 del_route("tca://booyah");
00807 sleep(5);
00808
00809 get_routes();
00810
00811 return true;
00812 }
00813
00814
00815 void
00816 TcaController::eat_bundles(bool verbose)
00817 {
00818
00819
00820 dtn_bundle_spec_t recv_spec;
00821 std::string payload;
00822
00823 printf("checking for queued bundles...\n");
00824
00825 if (verbose)
00826 {
00827 while (recv_bundle(recv_spec, payload, 0))
00828 printf(" discarding bundle: %s\n", payload.c_str());
00829 }
00830 else
00831 {
00832 dtn_bundle_payload_t recv_payload;
00833 int ret;
00834 do
00835 {
00836 memset(&recv_spec, 0, sizeof(recv_spec));
00837 memset(&recv_payload, 0, sizeof(recv_payload));
00838
00839 ret = dtn_recv(handle_, &recv_spec,
00840 DTN_PAYLOAD_MEM, &recv_payload, 0);
00841
00842 if (ret == 0)
00843 {
00844 fprintf(stderr, "error: unexpected bundle already queued... "
00845 "discarding\n");
00846 }
00847 else if (dtn_errno(handle_) != DTN_ETIMEOUT)
00848 {
00849 fprintf(stderr, "error: "
00850 "unexpected error checking for queued bundles: %s\n",
00851 dtn_strerror(dtn_errno(handle_)));
00852 return;
00853 }
00854 } while (ret == 0);
00855 }
00856 }
00857
00858
00859 bool
00860 TcaController::send_bundle(const dtn_bundle_spec_t& spec,
00861 const std::string& payload)
00862 {
00863 printf("send_bundle: [%s] -> [%s] : '%s'\n",
00864 spec.source.uri, spec.dest.uri, payload.c_str());
00865
00866 dtn_bundle_payload_t send_payload;
00867 memset(&send_payload, 0, sizeof(send_payload));
00868 dtn_set_payload(&send_payload, DTN_PAYLOAD_MEM,
00869 const_cast<char*>(payload.c_str()), payload.length());
00870
00871 dtn_bundle_id_t bundle_id;
00872 memset(&bundle_id, 0, sizeof(bundle_id));
00873
00874 int r = 0;
00875 if ((r = dtn_send(handle_,
00876 const_cast<dtn_bundle_spec_t*>(&spec),
00877 &send_payload, &bundle_id)) != 0)
00878 {
00879 fprintf(stderr, "TcaController::send_bundle error %d (%s)\n",
00880 r, dtn_strerror(dtn_errno(handle_)));
00881 return false;
00882 }
00883
00884 return true;
00885 }
00886
00887
00888 bool
00889 TcaController::recv_bundle(dtn_bundle_spec_t& spec,
00890 std::string& payload,
00891 unsigned int timeout)
00892 {
00893 dtn_bundle_payload_t recv_payload;
00894 memset(&spec, 0, sizeof(spec));
00895 memset(&recv_payload, 0, sizeof(recv_payload));
00896
00897
00898
00899 int r;
00900 if ((r = dtn_recv(handle_, &spec,
00901 DTN_PAYLOAD_MEM, &recv_payload, timeout)) < 0)
00902 {
00903
00904
00905 return false;
00906 }
00907
00908 int n = recv_payload.buf.buf_len;
00909 char s_buf[n+1];
00910 memcpy(s_buf, recv_payload.buf.buf_val, n);
00911 s_buf[n] = '\0';
00912
00913 payload = s_buf;
00914
00915 printf("%d bytes from [%s]: %s\n",
00916
00917 n,
00918 spec.source.uri,
00919 payload.c_str());
00920
00921 return true;
00922 }
00923
00924
00925
00926 bool
00927 TcaController::send_bundle(const std::string& dest,
00928 const std::string& payload)
00929 {
00930
00931
00932
00933 dtn_bundle_spec_t spec;
00934 memset(&spec, 0, sizeof(spec));
00935
00936
00937 dtn_copy_eid(&spec.source, &local_eid_);
00938 dtn_copy_eid(&spec.replyto, &local_eid_);
00939
00940 if (dtn_parse_eid_string(&spec.dest, dest.c_str()))
00941 {
00942 fprintf(stderr, "TcaController::send_bundle: invalid destination"
00943 " eid string, %s\n", dest.c_str());
00944 return false;
00945 }
00946
00947
00948 spec.priority = COS_NORMAL;
00949 spec.dopts = DOPTS_NONE;
00950 spec.expiration = control_ttl_;
00951
00952 return send_bundle(spec, payload);
00953 }
00954
00955
00956