00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039 #include <sys/stat.h>
00040 #include <oasys/compat/inet_aton.h>
00041 #include <oasys/io/FileIOClient.h>
00042 #include <oasys/io/NetUtils.h>
00043 #include <oasys/util/Pointers.h>
00044 #include <oasys/util/ScratchBuffer.h>
00045 #include <oasys/util/XDRUtils.h>
00046
00047 #include "APIServer.h"
00048 #include "bundling/Bundle.h"
00049 #include "bundling/BundleEvent.h"
00050 #include "bundling/BundleDaemon.h"
00051 #include "cmd/APICommand.h"
00052 #include "reg/APIRegistration.h"
00053 #include "reg/RegistrationTable.h"
00054 #include "routing/BundleRouter.h"
00055 #include "storage/GlobalStore.h"
00056
00057 #ifndef MIN
00058 #define MIN(x, y) ((x)<(y) ? (x) : (y))
00059 #endif
00060
00061
00062 #ifdef __CYGWIN__
00063
00064
00065
00066
00067 extern "C" {
00068 extern void xdrmem_create(XDR *__xdrs, __const caddr_t __addr,
00069 u_int __size, enum xdr_op __xop);
00070 }
00071
00072
00073
00074
00075
00076 typedef void (*xdr_setpos_t)(XDR *, int);
00077 #undef xdr_setpos
00078 #define xdr_setpos(xdrs, pos) ((xdr_setpos_t)(*(xdrs)->x_ops->x_setpostn))(xdrs, pos)
00079
00080 typedef int (*xdr_getpos_t)(XDR *);
00081 #undef xdr_getpos
00082 #define xdr_getpos(xdrs) ((xdr_getpos_t)(*(xdrs)->x_ops->x_getpostn))(xdrs)
00083
00084 typedef int (*xdr_putlong_t)(XDR *, long *);
00085 #undef xdr_putlong
00086 #define xdr_putlong(xdrs, ptr) ((xdr_putlong_t)(*(xdrs)->x_ops->x_putlong))(xdrs, ptr)
00087
00088 #endif
00089
00090 namespace dtn {
00091
00092
00093 APIServer::APIServer()
00094 : TCPServerThread("APIServer", "/dtn/apiserver", DELETE_ON_EXIT)
00095 {
00096 local_addr_ = htonl(INADDR_LOOPBACK);
00097 local_port_ = DTN_IPC_PORT;
00098
00099
00100 char *env;
00101 if ((env = getenv("DTNAPI_ADDR")) != NULL) {
00102 if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
00103 {
00104 log_err("DTNAPI_ADDR environment variable (%s) "
00105 "not a valid ip address, using default of localhost",
00106 env);
00107
00108 local_addr_ = htonl(INADDR_LOOPBACK);
00109 } else {
00110 log_debug("local address set to %s by DTNAPI_ADDR "
00111 "environment variable", env);
00112 }
00113 }
00114
00115 if ((env = getenv("DTNAPI_PORT")) != NULL) {
00116 char *end;
00117 u_int port = strtoul(env, &end, 10);
00118 if (*end != '\0' || port > 0xffff)
00119 {
00120 log_err("DTNAPI_PORT environment variable (%s) "
00121 "not a valid ip port, using default of %d",
00122 env, DTN_IPC_PORT);
00123 port = DTN_IPC_PORT;
00124 } else {
00125 log_debug("api port set to %s by DTNAPI_PORT "
00126 "environment variable", env);
00127 }
00128 local_port_ = (u_int16_t)port;
00129 }
00130
00131 if (local_addr_ != INADDR_ANY || local_port_ != 0) {
00132 log_debug("APIServer init (evironment set addr %s port %d)",
00133 intoa(local_addr_), local_port_);
00134 } else {
00135 log_debug("APIServer init");
00136 }
00137
00138 oasys::TclCommandInterp::instance()->reg(new APICommand(this));
00139 }
00140
00141
00142 void
00143 APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
00144 {
00145 APIClient* c = new APIClient(fd, addr, port);
00146 c->start();
00147 }
00148
00149
00150 APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port)
00151 : Thread("APIClient", DELETE_ON_EXIT),
00152 TCPClient(fd, addr, port, "/dtn/apiclient"),
00153 notifier_(logpath_)
00154 {
00155
00156 xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
00157 xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);
00158
00159 bindings_ = new APIRegistrationList();
00160 }
00161
00162
00163 APIClient::~APIClient()
00164 {
00165 log_debug("client destroyed");
00166 delete_z(bindings_);
00167 }
00168
00169
00170 void
00171 APIClient::close_session()
00172 {
00173 TCPClient::close();
00174
00175 APIRegistration* reg;
00176 while (! bindings_->empty()) {
00177 reg = bindings_->front();
00178 bindings_->pop_front();
00179
00180 reg->set_active(false);
00181
00182 if (reg->expired()) {
00183 log_debug("removing expired registration %d", reg->regid());
00184 BundleDaemon::post(new RegistrationExpiredEvent(reg->regid()));
00185 }
00186 }
00187 }
00188
00189
00190 int
00191 APIClient::handle_handshake()
00192 {
00193 u_int32_t handshake;
00194 u_int16_t message_type, ipc_version;
00195
00196 int ret = readall((char*)&handshake, sizeof(handshake));
00197 if (ret != sizeof(handshake)) {
00198 log_err("error reading handshake: (got %d/%zu), \"error\" %s",
00199 ret, sizeof(handshake), strerror(errno));
00200 return -1;
00201 }
00202
00203 message_type = ntohl(handshake) >> 16;
00204 ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);
00205
00206 if (message_type != DTN_OPEN) {
00207 log_err("handshake (%d)'s message type %d != DTN_OPEN (%d)",
00208 handshake, message_type, DTN_OPEN);
00209 return -1;
00210 }
00211
00212 if (ipc_version != DTN_IPC_VERSION) {
00213 log_err("handshake (%d)'s version %d != DTN_IPC_VERSION (%d)",
00214 handshake, ipc_version, DTN_IPC_VERSION);
00215 return -1;
00216 }
00217
00218 ret = writeall((char*)&handshake, sizeof(handshake));
00219 if (ret != sizeof(handshake)) {
00220 log_err("error writing handshake: %s", strerror(errno));
00221 return -1;
00222 }
00223
00224 return 0;
00225 }
00226
00227
00228 void
00229 APIClient::run()
00230 {
00231 int ret;
00232 u_int8_t type;
00233 u_int32_t len;
00234
00235 log_info("new session %s:%d -> %s:%d",
00236 intoa(local_addr()), local_port(),
00237 intoa(remote_addr()), remote_port());
00238
00239 if (handle_handshake() != 0) {
00240 close_session();
00241 return;
00242 }
00243
00244 while (true) {
00245 xdr_setpos(&xdr_encode_, 0);
00246 xdr_setpos(&xdr_decode_, 0);
00247
00248
00249
00250
00251
00252 ret = read(&buf_[3], DTN_MAX_API_MSG);
00253
00254 if (ret <= 0) {
00255 log_warn("client error or disconnection");
00256 close_session();
00257 return;
00258 }
00259
00260 if (ret < 5) {
00261 log_err("ack!! can't handle really short read...");
00262 close_session();
00263 return;
00264 }
00265
00266
00267
00268
00269
00270 type = buf_[3];
00271 memcpy(&len, &buf_[4], sizeof(len));
00272
00273 len = ntohl(len);
00274
00275 ret -= 5;
00276 log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);
00277
00278
00279
00280 if (ret < (int)len) {
00281 int toget = len - ret;
00282 if (readall(&buf_[8 + ret], toget) != toget) {
00283 log_err("error reading message remainder: %s",
00284 strerror(errno));
00285 close_session();
00286 return;
00287 }
00288 }
00289
00290
00291 switch(type) {
00292 #define DISPATCH(_type, _fn) \
00293 case _type: \
00294 ret = _fn(); \
00295 break;
00296
00297 DISPATCH(DTN_LOCAL_EID, handle_local_eid);
00298 DISPATCH(DTN_REGISTER, handle_register);
00299 DISPATCH(DTN_UNREGISTER, handle_unregister);
00300 DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
00301 DISPATCH(DTN_SEND, handle_send);
00302 DISPATCH(DTN_BIND, handle_bind);
00303 DISPATCH(DTN_UNBIND, handle_unbind);
00304 DISPATCH(DTN_RECV, handle_recv);
00305 DISPATCH(DTN_BEGIN_POLL, handle_begin_poll);
00306 DISPATCH(DTN_CANCEL_POLL, handle_cancel_poll);
00307 DISPATCH(DTN_CLOSE, handle_close);
00308 #undef DISPATCH
00309
00310 default:
00311 log_err("unknown message type code 0x%x", type);
00312 ret = DTN_EMSGTYPE;
00313 break;
00314 }
00315
00316
00317
00318 if (ret == -1) {
00319 close_session();
00320 return;
00321 }
00322
00323
00324 if (send_response(ret) != 0) {
00325 return;
00326 }
00327
00328
00329
00330 if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
00331 close_session();
00332 return;
00333 }
00334
00335 }
00336 }
00337
00338
00339 int
00340 APIClient::send_response(int ret)
00341 {
00342 u_int32_t len, msglen;
00343
00344
00345
00346 ASSERT(ret == DTN_SUCCESS ||
00347 (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
00348
00349
00350
00351
00352 len = xdr_getpos(&xdr_encode_);
00353 log_debug("building reply: status %s, length %d",
00354 dtnipc_msgtoa(ret), len);
00355
00356 msglen = len + 8;
00357 ret = ntohl(ret);
00358 len = htonl(len);
00359
00360 memcpy(buf_, &ret, sizeof(ret));
00361 memcpy(&buf_[4], &len, sizeof(len));
00362
00363 log_debug("sending %d byte reply message", msglen);
00364 if (writeall(buf_, msglen) != (int)msglen) {
00365 log_err("error sending reply: %s", strerror(errno));
00366 close_session();
00367 return -1;
00368 }
00369
00370 return 0;
00371 }
00372
00373
00374 bool
00375 APIClient::is_bound(u_int32_t regid)
00376 {
00377 APIRegistrationList::iterator iter;
00378 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00379 if ((*iter)->regid() == regid) {
00380 return true;
00381 }
00382 }
00383
00384 return false;
00385 }
00386
00387
00388 int
00389 APIClient::handle_local_eid()
00390 {
00391 dtn_service_tag_t service_tag;
00392 dtn_endpoint_id_t local_eid;
00393
00394
00395 if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
00396 {
00397 log_err("error in xdr unpacking arguments");
00398 return DTN_EXDR;
00399 }
00400
00401
00402 EndpointID eid(BundleDaemon::instance()->local_eid());
00403 if (eid.append_service_tag(service_tag.tag) == false) {
00404 log_err("error appending service tag");
00405 return DTN_EINVAL;
00406 }
00407
00408 memset(&local_eid, 0, sizeof(local_eid));
00409 eid.copyto(&local_eid);
00410
00411
00412 if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
00413 log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
00414 return DTN_EXDR;
00415 }
00416
00417 log_debug("get_local_eid encoded %d byte response",
00418 xdr_getpos(&xdr_encode_));
00419
00420 return DTN_SUCCESS;
00421 }
00422
00423
00424 int
00425 APIClient::handle_register()
00426 {
00427 APIRegistration* reg;
00428 Registration::failure_action_t action;
00429 EndpointIDPattern endpoint;
00430 std::string script;
00431
00432 dtn_reg_info_t reginfo;
00433
00434 memset(®info, 0, sizeof(reginfo));
00435
00436
00437 if (!xdr_dtn_reg_info_t(&xdr_decode_, ®info))
00438 {
00439 log_err("error in xdr unpacking arguments");
00440 return DTN_EXDR;
00441 }
00442
00443
00444
00445 oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)®info);
00446
00447 endpoint.assign(®info.endpoint);
00448
00449 if (!endpoint.valid()) {
00450 log_err("invalid endpoint id in register: '%s'",
00451 reginfo.endpoint.uri);
00452 return DTN_EINVAL;
00453 }
00454
00455 switch (reginfo.failure_action) {
00456 case DTN_REG_DEFER: action = Registration::DEFER; break;
00457 case DTN_REG_DROP: action = Registration::DROP; break;
00458 case DTN_REG_EXEC: action = Registration::EXEC; break;
00459 default: {
00460 log_err("invalid failure action code 0x%x", reginfo.failure_action);
00461 return DTN_EINVAL;
00462 }
00463 }
00464
00465 if (action == Registration::EXEC) {
00466 script.assign(reginfo.script.script_val, reginfo.script.script_len);
00467 }
00468
00469 u_int32_t regid = GlobalStore::instance()->next_regid();
00470 reg = new APIRegistration(regid, endpoint, action,
00471 reginfo.expiration, script);
00472
00473 if (! reginfo.init_passive) {
00474
00475 if (! bindings_->empty()) {
00476 log_err("error: handle already bound to a registration");
00477 return DTN_EBUSY;
00478 }
00479
00480
00481 bindings_->push_back(reg);
00482 reg->set_active(true);
00483 }
00484
00485 BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
00486 ¬ifier_);
00487
00488
00489 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) {
00490 log_err("internal error in xdr: xdr_dtn_reg_id_t");
00491 return DTN_EXDR;
00492 }
00493
00494 return DTN_SUCCESS;
00495 }
00496
00497
00498 int
00499 APIClient::handle_unregister()
00500 {
00501 Registration* reg;
00502 dtn_reg_id_t regid;
00503
00504
00505 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id))
00506 {
00507 log_err("error in xdr unpacking arguments");
00508 return DTN_EXDR;
00509 }
00510
00511 reg = BundleDaemon::instance()->reg_table()->get(regid);
00512 if (reg == NULL) {
00513 return DTN_ENOTFOUND;
00514 }
00515
00516
00517
00518
00519
00520 if (is_bound(reg->regid()) && reg->active()) {
00521 if (reg->expired()) {
00522 return DTN_EINVAL;
00523 }
00524
00525 reg->force_expire();
00526 ASSERT(reg->expired());
00527 return DTN_SUCCESS;
00528 }
00529
00530
00531
00532 if (reg->active()) {
00533 return DTN_EBUSY;
00534 }
00535
00536 BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
00537 ¬ifier_);
00538
00539 return DTN_SUCCESS;
00540 }
00541
00542
00543 int
00544 APIClient::handle_find_registration()
00545 {
00546 Registration* reg;
00547 EndpointIDPattern endpoint;
00548 dtn_endpoint_id_t app_eid;
00549
00550
00551 if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
00552 {
00553 log_err("error in xdr unpacking arguments");
00554 return DTN_EXDR;
00555 }
00556
00557 endpoint.assign(&app_eid);
00558 if (!endpoint.valid()) {
00559 log_err("invalid endpoint id in find_registration: '%s'",
00560 app_eid.uri);
00561 return DTN_EINVAL;
00562 }
00563
00564 reg = BundleDaemon::instance()->reg_table()->get(endpoint);
00565 if (reg == NULL) {
00566 return DTN_ENOTFOUND;
00567 }
00568
00569 u_int32_t regid = reg->regid();
00570
00571
00572 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) {
00573 log_err("internal error in xdr: xdr_dtn_reg_id_t");
00574 return DTN_EXDR;
00575 }
00576
00577 return DTN_SUCCESS;
00578 }
00579
00580
00581 int
00582 APIClient::handle_bind()
00583 {
00584 dtn_reg_id_t regid;
00585
00586
00587 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) {
00588 log_err("error in xdr unpacking arguments");
00589 return DTN_EXDR;
00590 }
00591
00592
00593 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00594 Registration* reg = regtable->get(regid);
00595
00596 if (!reg) {
00597 log_err("can't find registration %d", regid);
00598 return DTN_ENOTFOUND;
00599 }
00600
00601 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00602 if (api_reg == NULL) {
00603 log_crit("registration %d is not an API registration!!",
00604 regid);
00605 return DTN_ENOTFOUND;
00606 }
00607
00608 if (api_reg->active()) {
00609 log_err("registration %d is already in active mode", regid);
00610 return DTN_EBUSY;
00611 }
00612
00613
00614 if (! bindings_->empty()) {
00615 log_err("error: handle already bound to a registration");
00616 return DTN_EBUSY;
00617 }
00618
00619
00620 bindings_->push_back(api_reg);
00621 api_reg->set_active(true);
00622
00623 log_info("DTN_BIND: bound to registration %d", reg->regid());
00624
00625 return DTN_SUCCESS;
00626 }
00627
00628
00629 int
00630 APIClient::handle_unbind()
00631 {
00632 dtn_reg_id_t regid;
00633
00634
00635 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) {
00636 log_err("error in xdr unpacking arguments");
00637 return DTN_EXDR;
00638 }
00639
00640
00641 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00642 Registration* reg = regtable->get(regid);
00643
00644 if (!reg) {
00645 log_err("can't find registration %d", regid);
00646 return DTN_ENOTFOUND;
00647 }
00648
00649 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00650 if (api_reg == NULL) {
00651 log_crit("registration %d is not an API registration!!",
00652 regid);
00653 return DTN_ENOTFOUND;
00654 }
00655
00656 APIRegistrationList::iterator iter;
00657 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00658 if (*iter == api_reg) {
00659 bindings_->erase(iter);
00660 ASSERT(api_reg->active());
00661 api_reg->set_active(false);
00662
00663 log_info("DTN_UNBIND: unbound from registration %d", regid);
00664 return DTN_SUCCESS;
00665 }
00666 }
00667
00668 log_err("registration %d not bound to this api client", regid);
00669 return DTN_ENOTFOUND;
00670 }
00671
00672
00673 int
00674 APIClient::handle_send()
00675 {
00676 Bundle* b;
00677 dtn_bundle_spec_t spec;
00678 dtn_bundle_payload_t payload;
00679
00680 memset(&spec, 0, sizeof(spec));
00681 memset(&payload, 0, sizeof(payload));
00682
00683
00684 if (!xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
00685 !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
00686 {
00687 log_err("error in xdr unpacking arguments");
00688 return DTN_EXDR;
00689 }
00690
00691 b = new Bundle();
00692
00693
00694 oasys::ScopeMalloc m((payload.location == DTN_PAYLOAD_MEM) ?
00695 payload.dtn_bundle_payload_t_u.buf.buf_val :
00696 payload.dtn_bundle_payload_t_u.filename.filename_val);
00697
00698
00699 b->source_.assign(&spec.source);
00700 b->dest_.assign(&spec.dest);
00701 if (spec.replyto.uri[0] == '\0') {
00702 b->replyto_.assign(EndpointID::NULL_EID());
00703 } else {
00704 b->replyto_.assign(&spec.replyto);
00705 }
00706 b->custodian_.assign(EndpointID::NULL_EID());
00707
00708 oasys::StringBuffer error;
00709 if (!b->validate(&error)) {
00710 log_err("bundle validation failed: %s", error.data());
00711 return DTN_EINVAL;
00712 }
00713
00714
00715 switch (spec.priority) {
00716 #define COS(_cos) case _cos: b->priority_ = Bundle::_cos; break;
00717 COS(COS_BULK);
00718 COS(COS_NORMAL);
00719 COS(COS_EXPEDITED);
00720 COS(COS_RESERVED);
00721 #undef COS
00722 default:
00723 log_err("invalid priority level %d", (int)spec.priority);
00724 return DTN_EINVAL;
00725 };
00726
00727
00728 if (spec.dopts & DOPTS_CUSTODY)
00729 b->custody_requested_ = true;
00730
00731 if (spec.dopts & DOPTS_DELIVERY_RCPT)
00732 b->delivery_rcpt_ = true;
00733
00734 if (spec.dopts & DOPTS_RECEIVE_RCPT)
00735 b->receive_rcpt_ = true;
00736
00737 if (spec.dopts & DOPTS_FORWARD_RCPT)
00738 b->forward_rcpt_ = true;
00739
00740 if (spec.dopts & DOPTS_CUSTODY_RCPT)
00741 b->custody_rcpt_ = true;
00742
00743 if (spec.dopts & DOPTS_DELETE_RCPT)
00744 b->deletion_rcpt_ = true;
00745
00746
00747 b->expiration_ = spec.expiration;
00748
00749
00750 size_t payload_len;
00751 switch ( payload.location ) {
00752 case DTN_PAYLOAD_MEM:
00753 b->payload_.set_data((u_char*)payload.dtn_bundle_payload_t_u.buf.buf_val,
00754 payload.dtn_bundle_payload_t_u.buf.buf_len);
00755 payload_len = payload.dtn_bundle_payload_t_u.buf.buf_len;
00756 break;
00757
00758 case DTN_PAYLOAD_FILE:
00759 char filename[PATH_MAX];
00760 FILE * file;
00761 struct stat finfo;
00762 int r, left;
00763 u_char buffer[4096];
00764
00765 sprintf(filename, "%.*s",
00766 (int)payload.dtn_bundle_payload_t_u.filename.filename_len,
00767 payload.dtn_bundle_payload_t_u.filename.filename_val);
00768
00769 if (stat(filename, &finfo) || (file = fopen(filename, "r")) == NULL)
00770 {
00771 log_err("payload file %s does not exist!", filename);
00772 return DTN_EINVAL;
00773 }
00774
00775 payload_len = finfo.st_size;
00776 b->payload_.set_length(payload_len);
00777
00778 b->payload_.reopen_file();
00779 left = payload_len;
00780 r = 0;
00781 while (left > 0)
00782 {
00783 r = fread(buffer, 1, (left>4096)?4096:left, file);
00784
00785 if (r)
00786 {
00787 b->payload_.append_data(buffer, r);
00788 left -= r;
00789 }
00790 else
00791 {
00792 sleep(1);
00793 }
00794 }
00795
00796 fclose(file);
00797 b->payload_.close_file();
00798 break;
00799
00800 default:
00801 log_err("payload.location of %d unknown", payload.location);
00802 return DTN_EINVAL;
00803 }
00804
00805
00806 dtn_bundle_id_t id;
00807 memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
00808 id.creation_secs = b->creation_ts_.seconds_;
00809 id.creation_subsecs = b->creation_ts_.seqno_;
00810
00811 log_info("DTN_SEND bundle *%p", b);
00812
00813
00814
00815 BundleDaemon::post_and_wait(new BundleReceivedEvent(b, EVENTSRC_APP),
00816 ¬ifier_);
00817
00818
00819 if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
00820 log_err("internal error in xdr: xdr_dtn_bundle_id_t");
00821 return DTN_EXDR;
00822 }
00823
00824 return DTN_SUCCESS;
00825 }
00826
00827
00828
00829 #define DTN_FILE_DELIVERY_BUF_SIZE 1000
00830
00831
00832 int
00833 APIClient::handle_recv()
00834 {
00835 dtn_bundle_spec_t spec;
00836 dtn_bundle_payload_t payload;
00837 dtn_bundle_payload_location_t location;
00838 dtn_timeval_t timeout;
00839 oasys::ScratchBuffer<u_char*> buf;
00840 APIRegistration* reg = NULL;
00841 bool sock_ready = false;
00842
00843
00844 if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
00845 (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
00846 {
00847 log_err("error in xdr unpacking arguments");
00848 return DTN_EXDR;
00849 }
00850
00851 int err = wait_for_bundle("recv", timeout, ®, &sock_ready);
00852 if (err != 0) {
00853 return err;
00854 }
00855
00856
00857
00858
00859 if (sock_ready) {
00860 log_debug("handle_recv: api socket ready -- trying to read one byte");
00861 char b;
00862 if (read(&b, 1) != 0) {
00863 log_err("handle_recv: protocol error -- "
00864 "data arrived or error while blocked in recv");
00865 return DTN_ECOMM;
00866 }
00867
00868 log_info("IPC socket closed while blocked in read... "
00869 "application must have exited");
00870 return -1;
00871 }
00872
00873 BundleRef bref("APIClient::handle_recv");
00874 bref = reg->bundle_list()->pop_front();
00875 Bundle* b = bref.object();
00876 ASSERT(b != NULL);
00877
00878 log_debug("handle_recv: popped bundle %d for registration %d (timeout %d)",
00879 b->bundleid_, reg->regid(), timeout);
00880
00881 memset(&spec, 0, sizeof(spec));
00882 memset(&payload, 0, sizeof(payload));
00883
00884
00885
00886 b->source_.copyto(&spec.source);
00887 b->dest_.copyto(&spec.dest);
00888 b->replyto_.copyto(&spec.replyto);
00889
00890
00891
00892
00893 payload.location = location;
00894
00895 if (location == DTN_PAYLOAD_MEM) {
00896
00897
00898
00899 size_t payload_len = b->payload_.length();
00900 payload.dtn_bundle_payload_t_u.buf.buf_len = payload_len;
00901 if (payload_len != 0) {
00902 buf.reserve(payload_len);
00903 payload.dtn_bundle_payload_t_u.buf.buf_val =
00904 (char*)b->payload_.read_data(0, payload_len, buf.buf());
00905 } else {
00906 payload.dtn_bundle_payload_t_u.buf.buf_val = 0;
00907 }
00908
00909 } else if (location == DTN_PAYLOAD_FILE) {
00910 oasys::FileIOClient tmpfile;
00911 char *tdir, templ[64];
00912
00913 tdir = getenv("TMP");
00914 if (tdir == NULL) {
00915 tdir = getenv("TEMP");
00916 }
00917 if (tdir == NULL) {
00918 tdir = "/tmp";
00919 }
00920
00921 snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);
00922
00923 if (tmpfile.mkstemp(templ) == -1) {
00924 log_err("can't open temporary file to deliver bundle");
00925 return DTN_EINTERNAL;
00926 }
00927
00928 if (b->payload_.location() == BundlePayload::MEMORY) {
00929 tmpfile.writeall((char*)b->payload_.memory_data(),
00930 b->payload_.length());
00931
00932 } else {
00933 b->payload_.copy_file(&tmpfile);
00934 }
00935
00936 payload.dtn_bundle_payload_t_u.filename.filename_val = (char*)tmpfile.path();
00937 payload.dtn_bundle_payload_t_u.filename.filename_len = tmpfile.path_len();
00938 tmpfile.close();
00939
00940 } else {
00941 log_err("payload location %d not understood", location);
00942 return DTN_EINVAL;
00943 }
00944
00945 if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
00946 {
00947 log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
00948 return DTN_EXDR;
00949 }
00950
00951 if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
00952 {
00953 log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
00954 return DTN_EXDR;
00955 }
00956
00957 log_info("DTN_RECV: "
00958 "successfully delivered bundle %d to registration %d",
00959 b->bundleid_, reg->regid());
00960
00961 BundleDaemon::post(new BundleDeliveredEvent(b, reg));
00962
00963 return DTN_SUCCESS;
00964 }
00965
00966
00967 int
00968 APIClient::handle_begin_poll()
00969 {
00970 dtn_timeval_t timeout;
00971 APIRegistration* reg = NULL;
00972 bool sock_ready = false;
00973
00974
00975 if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
00976 {
00977 log_err("error in xdr unpacking arguments");
00978 return DTN_EXDR;
00979 }
00980
00981 int err = wait_for_bundle("poll", timeout, ®, &sock_ready);
00982 if (err != 0) {
00983 return err;
00984 }
00985
00986
00987
00988 if (sock_ready) {
00989 log_debug("handle_begin_poll: "
00990 "api socket ready -- trying to read one byte");
00991 char type;
00992
00993 int ret = read(&type, 1);
00994 if (ret == 0) {
00995 log_info("IPC socket closed while blocked in read... "
00996 "application must have exited");
00997 return -1;
00998 }
00999
01000 if (ret == -1) {
01001 log_err("handle_begin_poll: protocol error -- "
01002 "error while blocked in poll");
01003 return DTN_ECOMM;
01004 }
01005
01006 if (type != DTN_CANCEL_POLL) {
01007 log_err("handle_poll: error got unexpected message '%s' "
01008 "while blocked in poll", dtnipc_msgtoa(type));
01009 return DTN_ECOMM;
01010 }
01011
01012
01013 u_int32_t len;
01014 ret = read((char*)&len, 4);
01015 if (ret != 4 || len != 0) {
01016 log_err("handle_begin_poll: protocol error -- "
01017 "error getting cancel poll length");
01018 return DTN_ECOMM;
01019 }
01020
01021
01022
01023
01024 send_response(DTN_SUCCESS);
01025 }
01026
01027 return DTN_SUCCESS;
01028 }
01029
01030
01031 int
01032 APIClient::handle_cancel_poll()
01033 {
01034
01035
01036
01037
01038
01039 return DTN_SUCCESS;
01040 }
01041
01042
01043 int
01044 APIClient::wait_for_bundle(const char* operation, dtn_timeval_t dtn_timeout,
01045 APIRegistration** regp, bool* sock_ready)
01046 {
01047 APIRegistration* reg;
01048
01049
01050
01051
01052
01053 if (bindings_->empty()) {
01054 log_err("wait_for_bundle(%s): no bound registration",
01055 operation);
01056 return DTN_EINVAL;
01057 }
01058
01059 reg = bindings_->front();
01060
01061
01062 if (reg->bundle_list()->size() != 0) {
01063 log_debug("wait_for_bundle(%s): "
01064 "immediately returning bundle for reg %d",
01065 operation, reg->regid());
01066 *regp = reg;
01067 return 0;
01068 }
01069
01070 int timeout = (int)dtn_timeout;
01071 if (timeout < -1) {
01072 log_err("wait_for_bundle(%s): "
01073 "invalid timeout value %d", operation, timeout);
01074 return DTN_EINVAL;
01075 }
01076
01077 struct pollfd pollfds[2];
01078
01079 struct pollfd* bundle_poll = &pollfds[0];
01080 bundle_poll->fd = reg->bundle_list()->notifier()->read_fd();
01081 bundle_poll->events = POLLIN;
01082 bundle_poll->revents = 0;
01083
01084 struct pollfd* sock_poll = &pollfds[1];
01085 sock_poll->fd = TCPClient::fd_;
01086 sock_poll->events = POLLIN | POLLERR;
01087 sock_poll->revents = 0;
01088
01089 log_debug("wait_for_bundle(%s): "
01090 "blocking to get bundle for registration %d (timeout %d)",
01091 operation, reg->regid(), timeout);
01092 int nready = oasys::IO::poll_multiple(&pollfds[0], 2, timeout,
01093 NULL, logpath_);
01094
01095 if (nready == oasys::IOTIMEOUT) {
01096 log_debug("wait_for_bundle(%s): timeout waiting for bundle",
01097 operation);
01098 return DTN_ETIMEOUT;
01099
01100 } else if (nready <= 0) {
01101 log_err("wait_for_bundle(%s): unexpected error polling for bundle",
01102 operation);
01103 return DTN_EINTERNAL;
01104 }
01105
01106 ASSERT(nready == 1);
01107
01108
01109
01110
01111 if (sock_poll->revents != 0) {
01112 *sock_ready = true;
01113 return 0;
01114 }
01115
01116
01117 if (bundle_poll->revents == 0) {
01118 log_crit("wait_for_bundle(%s): unexpected error polling for bundle: "
01119 "neither file descriptor is ready", operation);
01120 return DTN_EINTERNAL;
01121 }
01122
01123 if (reg->bundle_list()->size() == 0) {
01124 log_err("wait_for_bundle(%s): "
01125 "bundle list returned ready but no bundle on queue!!",
01126 operation);
01127 return DTN_EINTERNAL;
01128 }
01129
01130 *regp = reg;
01131 return 0;
01132 }
01133
01134
01135 int
01136 APIClient::handle_close()
01137 {
01138 log_info("received DTN_CLOSE message; closing API handle");
01139
01140 return -1;
01141 }
01142
01143 }