00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018 #include <sys/stat.h>
00019 #include <oasys/compat/inet_aton.h>
00020 #include <oasys/io/FileIOClient.h>
00021 #include <oasys/io/NetUtils.h>
00022 #include <oasys/util/Pointers.h>
00023 #include <oasys/util/ScratchBuffer.h>
00024 #include <oasys/util/XDRUtils.h>
00025
00026 #include "APIServer.h"
00027 #include "bundling/APIBlockProcessor.h"
00028 #include "bundling/Bundle.h"
00029 #include "bundling/BundleEvent.h"
00030 #include "bundling/BundleDaemon.h"
00031 #include "bundling/BundleStatusReport.h"
00032 #include "bundling/SDNV.h"
00033 #include "cmd/APICommand.h"
00034 #include "reg/APIRegistration.h"
00035 #include "reg/RegistrationTable.h"
00036 #include "routing/BundleRouter.h"
00037 #include "storage/GlobalStore.h"
00038
00039 #ifndef MIN
00040 #define MIN(x, y) ((x)<(y) ? (x) : (y))
00041 #endif
00042
00043
00044 #ifdef __CYGWIN__
00045
00046
00047
00048
00049 extern "C" {
00050 extern void xdrmem_create(XDR *__xdrs, __const caddr_t __addr,
00051 u_int __size, enum xdr_op __xop);
00052 }
00053
00054
00055
00056
00057
00058 typedef void (*xdr_setpos_t)(XDR *, int);
00059 #undef xdr_setpos
00060 #define xdr_setpos(xdrs, pos) ((xdr_setpos_t)(*(xdrs)->x_ops->x_setpostn))(xdrs, pos)
00061
00062 typedef int (*xdr_getpos_t)(XDR *);
00063 #undef xdr_getpos
00064 #define xdr_getpos(xdrs) ((xdr_getpos_t)(*(xdrs)->x_ops->x_getpostn))(xdrs)
00065
00066 typedef int (*xdr_putlong_t)(XDR *, long *);
00067 #undef xdr_putlong
00068 #define xdr_putlong(xdrs, ptr) ((xdr_putlong_t)(*(xdrs)->x_ops->x_putlong))(xdrs, ptr)
00069
00070 #endif
00071
00072 namespace dtn {
00073
00074
00075 APIServer::APIServer()
00076 : TCPServerThread("APIServer", "/dtn/apiserver", DELETE_ON_EXIT)
00077 {
00078 local_addr_ = htonl(INADDR_LOOPBACK);
00079 local_port_ = DTN_IPC_PORT;
00080
00081
00082 char *env;
00083 if ((env = getenv("DTNAPI_ADDR")) != NULL) {
00084 if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
00085 {
00086 log_err("DTNAPI_ADDR environment variable (%s) "
00087 "not a valid ip address, using default of localhost",
00088 env);
00089
00090 local_addr_ = htonl(INADDR_LOOPBACK);
00091 } else {
00092 log_debug("local address set to %s by DTNAPI_ADDR "
00093 "environment variable", env);
00094 }
00095 }
00096
00097 if ((env = getenv("DTNAPI_PORT")) != NULL) {
00098 char *end;
00099 u_int port = strtoul(env, &end, 10);
00100 if (*end != '\0' || port > 0xffff)
00101 {
00102 log_err("DTNAPI_PORT environment variable (%s) "
00103 "not a valid ip port, using default of %d",
00104 env, DTN_IPC_PORT);
00105 port = DTN_IPC_PORT;
00106 } else {
00107 log_debug("api port set to %s by DTNAPI_PORT "
00108 "environment variable", env);
00109 }
00110 local_port_ = (u_int16_t)port;
00111 }
00112
00113 if (local_addr_ != INADDR_ANY || local_port_ != 0) {
00114 log_debug("APIServer init (evironment set addr %s port %d)",
00115 intoa(local_addr_), local_port_);
00116 } else {
00117 log_debug("APIServer init");
00118 }
00119
00120 oasys::TclCommandInterp::instance()->reg(new APICommand(this));
00121 }
00122
00123
00124 void
00125 APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
00126 {
00127 APIClient* c = new APIClient(fd, addr, port);
00128 c->start();
00129 }
00130
00131
00132 APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port)
00133 : Thread("APIClient", DELETE_ON_EXIT),
00134 TCPClient(fd, addr, port, "/dtn/apiclient"),
00135 notifier_(logpath_)
00136 {
00137
00138 xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
00139 xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);
00140
00141 bindings_ = new APIRegistrationList();
00142 }
00143
00144
00145 APIClient::~APIClient()
00146 {
00147 log_debug("client destroyed");
00148 delete_z(bindings_);
00149 }
00150
00151
00152 void
00153 APIClient::close_session()
00154 {
00155 TCPClient::close();
00156
00157 APIRegistration* reg;
00158 while (! bindings_->empty()) {
00159 reg = bindings_->front();
00160 bindings_->pop_front();
00161
00162 reg->set_active(false);
00163
00164 if (reg->expired()) {
00165 log_debug("removing expired registration %d", reg->regid());
00166 BundleDaemon::post(new RegistrationExpiredEvent(reg->regid()));
00167 }
00168 }
00169 }
00170
00171
00172 int
00173 APIClient::handle_handshake()
00174 {
00175 u_int32_t handshake;
00176 u_int16_t message_type, ipc_version;
00177
00178 int ret = readall((char*)&handshake, sizeof(handshake));
00179 if (ret != sizeof(handshake)) {
00180 log_err("error reading handshake: (got %d/%zu), \"error\" %s",
00181 ret, sizeof(handshake), strerror(errno));
00182 return -1;
00183 }
00184
00185 message_type = ntohl(handshake) >> 16;
00186 ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);
00187
00188 if (message_type != DTN_OPEN) {
00189 log_err("handshake (%d)'s message type %d != DTN_OPEN (%d)",
00190 handshake, message_type, DTN_OPEN);
00191 return -1;
00192 }
00193
00194 if (ipc_version != DTN_IPC_VERSION) {
00195 log_err("handshake (%d)'s version %d != DTN_IPC_VERSION (%d)",
00196 handshake, ipc_version, DTN_IPC_VERSION);
00197 return -1;
00198 }
00199
00200 ret = writeall((char*)&handshake, sizeof(handshake));
00201 if (ret != sizeof(handshake)) {
00202 log_err("error writing handshake: %s", strerror(errno));
00203 return -1;
00204 }
00205
00206 return 0;
00207 }
00208
00209
00210 void
00211 APIClient::run()
00212 {
00213 int ret;
00214 u_int8_t type;
00215 u_int32_t len;
00216
00217 log_info("new session %s:%d -> %s:%d",
00218 intoa(local_addr()), local_port(),
00219 intoa(remote_addr()), remote_port());
00220
00221 if (handle_handshake() != 0) {
00222 close_session();
00223 return;
00224 }
00225
00226 while (true) {
00227 xdr_setpos(&xdr_encode_, 0);
00228 xdr_setpos(&xdr_decode_, 0);
00229
00230
00231
00232
00233
00234 ret = read(&buf_[3], DTN_MAX_API_MSG);
00235
00236 if (ret <= 0) {
00237 log_warn("client error or disconnection");
00238 close_session();
00239 return;
00240 }
00241
00242 if (ret < 5) {
00243 log_err("ack!! can't handle really short read...");
00244 close_session();
00245 return;
00246 }
00247
00248
00249
00250
00251
00252 type = buf_[3];
00253 memcpy(&len, &buf_[4], sizeof(len));
00254
00255 len = ntohl(len);
00256
00257 ret -= 5;
00258 log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);
00259
00260
00261
00262 if (ret < (int)len) {
00263 int toget = len - ret;
00264 if (readall(&buf_[8 + ret], toget) != toget) {
00265 log_err("error reading message remainder: %s",
00266 strerror(errno));
00267 close_session();
00268 return;
00269 }
00270 }
00271
00272
00273 switch(type) {
00274 #define DISPATCH(_type, _fn) \
00275 case _type: \
00276 ret = _fn(); \
00277 break;
00278
00279 DISPATCH(DTN_LOCAL_EID, handle_local_eid);
00280 DISPATCH(DTN_REGISTER, handle_register);
00281 DISPATCH(DTN_UNREGISTER, handle_unregister);
00282 DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
00283 DISPATCH(DTN_SEND, handle_send);
00284 DISPATCH(DTN_BIND, handle_bind);
00285 DISPATCH(DTN_UNBIND, handle_unbind);
00286 DISPATCH(DTN_RECV, handle_recv);
00287 DISPATCH(DTN_BEGIN_POLL, handle_begin_poll);
00288 DISPATCH(DTN_CANCEL_POLL, handle_cancel_poll);
00289 DISPATCH(DTN_CLOSE, handle_close);
00290 #undef DISPATCH
00291
00292 default:
00293 log_err("unknown message type code 0x%x", type);
00294 ret = DTN_EMSGTYPE;
00295 break;
00296 }
00297
00298
00299
00300 if (ret == -1) {
00301 close_session();
00302 return;
00303 }
00304
00305
00306 if (send_response(ret) != 0) {
00307 return;
00308 }
00309
00310
00311
00312 if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
00313 close_session();
00314 return;
00315 }
00316
00317 }
00318 }
00319
00320
00321 int
00322 APIClient::send_response(int ret)
00323 {
00324 u_int32_t len, msglen;
00325
00326
00327
00328 ASSERT(ret == DTN_SUCCESS ||
00329 (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
00330
00331
00332
00333
00334 len = xdr_getpos(&xdr_encode_);
00335 log_debug("building reply: status %s, length %d",
00336 dtnipc_msgtoa(ret), len);
00337
00338 msglen = len + 8;
00339 ret = ntohl(ret);
00340 len = htonl(len);
00341
00342 memcpy(buf_, &ret, sizeof(ret));
00343 memcpy(&buf_[4], &len, sizeof(len));
00344
00345 log_debug("sending %d byte reply message", msglen);
00346 if (writeall(buf_, msglen) != (int)msglen) {
00347 log_err("error sending reply: %s", strerror(errno));
00348 close_session();
00349 return -1;
00350 }
00351
00352 return 0;
00353 }
00354
00355
00356 bool
00357 APIClient::is_bound(u_int32_t regid)
00358 {
00359 APIRegistrationList::iterator iter;
00360 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00361 if ((*iter)->regid() == regid) {
00362 return true;
00363 }
00364 }
00365
00366 return false;
00367 }
00368
00369
00370 int
00371 APIClient::handle_local_eid()
00372 {
00373 dtn_service_tag_t service_tag;
00374 dtn_endpoint_id_t local_eid;
00375
00376
00377 if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
00378 {
00379 log_err("error in xdr unpacking arguments");
00380 return DTN_EXDR;
00381 }
00382
00383
00384 EndpointID eid(BundleDaemon::instance()->local_eid());
00385 if (eid.append_service_tag(service_tag.tag) == false) {
00386 log_err("error appending service tag");
00387 return DTN_EINVAL;
00388 }
00389
00390 memset(&local_eid, 0, sizeof(local_eid));
00391 eid.copyto(&local_eid);
00392
00393
00394 if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
00395 log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
00396 return DTN_EXDR;
00397 }
00398
00399 log_debug("get_local_eid encoded %d byte response",
00400 xdr_getpos(&xdr_encode_));
00401
00402 return DTN_SUCCESS;
00403 }
00404
00405
00406 int
00407 APIClient::handle_register()
00408 {
00409 APIRegistration* reg;
00410 Registration::failure_action_t action;
00411 EndpointIDPattern endpoint;
00412 std::string script;
00413
00414 dtn_reg_info_t reginfo;
00415
00416 memset(®info, 0, sizeof(reginfo));
00417
00418
00419 if (!xdr_dtn_reg_info_t(&xdr_decode_, ®info))
00420 {
00421 log_err("error in xdr unpacking arguments");
00422 return DTN_EXDR;
00423 }
00424
00425
00426
00427 oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)®info);
00428
00429 endpoint.assign(®info.endpoint);
00430
00431 if (!endpoint.valid()) {
00432 log_err("invalid endpoint id in register: '%s'",
00433 reginfo.endpoint.uri);
00434 return DTN_EINVAL;
00435 }
00436
00437 switch (reginfo.failure_action) {
00438 case DTN_REG_DEFER: action = Registration::DEFER; break;
00439 case DTN_REG_DROP: action = Registration::DROP; break;
00440 case DTN_REG_EXEC: action = Registration::EXEC; break;
00441 default: {
00442 log_err("invalid failure action code 0x%x", reginfo.failure_action);
00443 return DTN_EINVAL;
00444 }
00445 }
00446
00447 if (action == Registration::EXEC) {
00448 script.assign(reginfo.script.script_val, reginfo.script.script_len);
00449 }
00450
00451 u_int32_t regid = GlobalStore::instance()->next_regid();
00452 reg = new APIRegistration(regid, endpoint, action,
00453 reginfo.expiration, script);
00454
00455 if (! reginfo.init_passive) {
00456
00457 if (! bindings_->empty()) {
00458 log_err("error: handle already bound to a registration");
00459 return DTN_EBUSY;
00460 }
00461
00462
00463 bindings_->push_back(reg);
00464 reg->set_active(true);
00465 }
00466
00467 BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
00468 ¬ifier_);
00469
00470
00471 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) {
00472 log_err("internal error in xdr: xdr_dtn_reg_id_t");
00473 return DTN_EXDR;
00474 }
00475
00476 return DTN_SUCCESS;
00477 }
00478
00479
00480 int
00481 APIClient::handle_unregister()
00482 {
00483 Registration* reg;
00484 dtn_reg_id_t regid;
00485
00486
00487 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id))
00488 {
00489 log_err("error in xdr unpacking arguments");
00490 return DTN_EXDR;
00491 }
00492
00493 reg = BundleDaemon::instance()->reg_table()->get(regid);
00494 if (reg == NULL) {
00495 return DTN_ENOTFOUND;
00496 }
00497
00498
00499
00500
00501
00502 if (is_bound(reg->regid()) && reg->active()) {
00503 if (reg->expired()) {
00504 return DTN_EINVAL;
00505 }
00506
00507 reg->force_expire();
00508 ASSERT(reg->expired());
00509 return DTN_SUCCESS;
00510 }
00511
00512
00513
00514 if (reg->active()) {
00515 return DTN_EBUSY;
00516 }
00517
00518 BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
00519 ¬ifier_);
00520
00521 return DTN_SUCCESS;
00522 }
00523
00524
00525 int
00526 APIClient::handle_find_registration()
00527 {
00528 Registration* reg;
00529 EndpointIDPattern endpoint;
00530 dtn_endpoint_id_t app_eid;
00531
00532
00533 if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
00534 {
00535 log_err("error in xdr unpacking arguments");
00536 return DTN_EXDR;
00537 }
00538
00539 endpoint.assign(&app_eid);
00540 if (!endpoint.valid()) {
00541 log_err("invalid endpoint id in find_registration: '%s'",
00542 app_eid.uri);
00543 return DTN_EINVAL;
00544 }
00545
00546 reg = BundleDaemon::instance()->reg_table()->get(endpoint);
00547 if (reg == NULL) {
00548 return DTN_ENOTFOUND;
00549 }
00550
00551 u_int32_t regid = reg->regid();
00552
00553
00554 if (!xdr_dtn_reg_id_t(&xdr_encode_, ®id)) {
00555 log_err("internal error in xdr: xdr_dtn_reg_id_t");
00556 return DTN_EXDR;
00557 }
00558
00559 return DTN_SUCCESS;
00560 }
00561
00562
00563 int
00564 APIClient::handle_bind()
00565 {
00566 dtn_reg_id_t regid;
00567
00568
00569 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) {
00570 log_err("error in xdr unpacking arguments");
00571 return DTN_EXDR;
00572 }
00573
00574
00575 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00576 Registration* reg = regtable->get(regid);
00577
00578 if (!reg) {
00579 log_err("can't find registration %d", regid);
00580 return DTN_ENOTFOUND;
00581 }
00582
00583 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00584 if (api_reg == NULL) {
00585 log_crit("registration %d is not an API registration!!",
00586 regid);
00587 return DTN_ENOTFOUND;
00588 }
00589
00590 if (api_reg->active()) {
00591 log_err("registration %d is already in active mode", regid);
00592 return DTN_EBUSY;
00593 }
00594
00595
00596 if (! bindings_->empty()) {
00597 log_err("error: handle already bound to a registration");
00598 return DTN_EBUSY;
00599 }
00600
00601
00602 bindings_->push_back(api_reg);
00603 api_reg->set_active(true);
00604
00605 log_info("DTN_BIND: bound to registration %d", reg->regid());
00606
00607 return DTN_SUCCESS;
00608 }
00609
00610
00611 int
00612 APIClient::handle_unbind()
00613 {
00614 dtn_reg_id_t regid;
00615
00616
00617 if (!xdr_dtn_reg_id_t(&xdr_decode_, ®id)) {
00618 log_err("error in xdr unpacking arguments");
00619 return DTN_EXDR;
00620 }
00621
00622
00623 const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00624 Registration* reg = regtable->get(regid);
00625
00626 if (!reg) {
00627 log_err("can't find registration %d", regid);
00628 return DTN_ENOTFOUND;
00629 }
00630
00631 APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00632 if (api_reg == NULL) {
00633 log_crit("registration %d is not an API registration!!",
00634 regid);
00635 return DTN_ENOTFOUND;
00636 }
00637
00638 APIRegistrationList::iterator iter;
00639 for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00640 if (*iter == api_reg) {
00641 bindings_->erase(iter);
00642 ASSERT(api_reg->active());
00643 api_reg->set_active(false);
00644
00645 log_info("DTN_UNBIND: unbound from registration %d", regid);
00646 return DTN_SUCCESS;
00647 }
00648 }
00649
00650 log_err("registration %d not bound to this api client", regid);
00651 return DTN_ENOTFOUND;
00652 }
00653
00654
00655 int
00656 APIClient::handle_send()
00657 {
00658 dtn_bundle_spec_t spec;
00659 dtn_bundle_payload_t payload;
00660
00661 memset(&spec, 0, sizeof(spec));
00662 memset(&payload, 0, sizeof(payload));
00663
00664
00665 if (!xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
00666 !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
00667 {
00668 log_err("error in xdr unpacking arguments");
00669 return DTN_EXDR;
00670 }
00671
00672 BundleRef b("APIClient::handle_send");
00673 b = new Bundle();
00674
00675
00676 oasys::ScopeXDRFree f1((xdrproc_t)xdr_dtn_bundle_spec_t,
00677 (char*)&spec);
00678 oasys::ScopeXDRFree f2((xdrproc_t)xdr_dtn_bundle_payload_t,
00679 (char*)&payload);
00680
00681
00682 b->source_.assign(&spec.source);
00683 b->dest_.assign(&spec.dest);
00684 if (spec.replyto.uri[0] == '\0') {
00685 b->replyto_.assign(EndpointID::NULL_EID());
00686 } else {
00687 b->replyto_.assign(&spec.replyto);
00688 }
00689 b->custodian_.assign(EndpointID::NULL_EID());
00690
00691 oasys::StringBuffer error;
00692 if (!b->validate(&error)) {
00693 log_err("bundle validation failed: %s", error.data());
00694 return DTN_EINVAL;
00695 }
00696
00697
00698 switch (spec.priority) {
00699 #define COS(_cos) case _cos: b->priority_ = Bundle::_cos; break;
00700 COS(COS_BULK);
00701 COS(COS_NORMAL);
00702 COS(COS_EXPEDITED);
00703 COS(COS_RESERVED);
00704 #undef COS
00705 default:
00706 log_err("invalid priority level %d", (int)spec.priority);
00707 return DTN_EINVAL;
00708 };
00709
00710
00711 if (spec.dopts & DOPTS_CUSTODY)
00712 b->custody_requested_ = true;
00713
00714 if (spec.dopts & DOPTS_DELIVERY_RCPT)
00715 b->delivery_rcpt_ = true;
00716
00717 if (spec.dopts & DOPTS_RECEIVE_RCPT)
00718 b->receive_rcpt_ = true;
00719
00720 if (spec.dopts & DOPTS_FORWARD_RCPT)
00721 b->forward_rcpt_ = true;
00722
00723 if (spec.dopts & DOPTS_CUSTODY_RCPT)
00724 b->custody_rcpt_ = true;
00725
00726 if (spec.dopts & DOPTS_DELETE_RCPT)
00727 b->deletion_rcpt_ = true;
00728
00729
00730 b->expiration_ = spec.expiration;
00731
00732 for (u_int i = 0; i < spec.blocks.blocks_len; i++) {
00733 dtn_extension_block_t* block = &spec.blocks.blocks_val[i];
00734
00735 b->api_blocks_.push_back(BlockInfo(APIBlockProcessor::instance()));
00736 BlockInfo* info = &b->api_blocks_.back();
00737 APIBlockProcessor::instance()->
00738 init_block(info, block->type, block->flags,
00739 (u_char*)block->data.data_val,
00740 block->data.data_len);
00741 }
00742
00743
00744
00745 size_t payload_len;
00746 char filename[PATH_MAX];
00747
00748 switch (payload.location) {
00749 case DTN_PAYLOAD_MEM:
00750 payload_len = payload.buf.buf_len;
00751 break;
00752
00753 case DTN_PAYLOAD_FILE:
00754 case DTN_PAYLOAD_TEMP_FILE:
00755 struct stat finfo;
00756 sprintf(filename, "%.*s",
00757 (int)payload.filename.filename_len,
00758 payload.filename.filename_val);
00759
00760 if (stat(filename, &finfo) != 0)
00761 {
00762 log_err("payload file %s does not exist!", filename);
00763 return DTN_EINVAL;
00764 }
00765
00766 payload_len = finfo.st_size;
00767 break;
00768
00769 default:
00770 log_err("payload.location of %d unknown", payload.location);
00771 return DTN_EINVAL;
00772 }
00773
00774 b->payload_.set_length(payload_len);
00775
00776
00777
00778 bool result;
00779 int reason;
00780 BundleDaemon::post_and_wait(
00781 new BundleAcceptRequest(b, EVENTSRC_APP, &result, &reason),
00782 ¬ifier_);
00783
00784 if (!result) {
00785 log_info("DTN_SEND bundle not accepted: reason %s",
00786 BundleStatusReport::reason_to_str(reason));
00787
00788 switch (reason) {
00789 case BundleProtocol::REASON_DEPLETED_STORAGE:
00790 return DTN_ENOSPACE;
00791 default:
00792 return DTN_EINTERNAL;
00793 }
00794 }
00795
00796 switch (payload.location) {
00797 case DTN_PAYLOAD_MEM:
00798 b->payload_.set_data((u_char*)payload.buf.buf_val,
00799 payload.buf.buf_len);
00800 break;
00801
00802 case DTN_PAYLOAD_FILE:
00803 FILE* file;
00804 int r, left;
00805 u_char buffer[4096];
00806
00807 if ((file = fopen(filename, "r")) == NULL)
00808 {
00809 log_err("payload file %s can't be opened!", filename);
00810 return DTN_EINVAL;
00811 }
00812
00813 left = payload_len;
00814 r = 0;
00815 while (left > 0)
00816 {
00817 r = fread(buffer, 1, (left>4096)?4096:left, file);
00818
00819 if (r)
00820 {
00821 b->payload_.append_data(buffer, r);
00822 left -= r;
00823 }
00824 else
00825 {
00826 sleep(1);
00827 }
00828 }
00829
00830 fclose(file);
00831 break;
00832
00833 case DTN_PAYLOAD_TEMP_FILE:
00834 if (! b->payload_.replace_with_file(filename)) {
00835 log_err("payload file %s can't be linked or copied",
00836 filename);
00837 return DTN_EINVAL;
00838 }
00839
00840 if (::unlink(filename) != 0) {
00841 log_err("error unlinking payload temp file: %s",
00842 strerror(errno));
00843
00844 }
00845 }
00846
00847
00848 dtn_bundle_id_t id;
00849 memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
00850 id.creation_ts.secs = b->creation_ts_.seconds_;
00851 id.creation_ts.seqno = b->creation_ts_.seqno_;
00852
00853 log_info("DTN_SEND bundle *%p", b.object());
00854
00855
00856
00857 BundleDaemon::post_and_wait(new BundleReceivedEvent(b.object(), EVENTSRC_APP),
00858 ¬ifier_);
00859
00860
00861 if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
00862 log_err("internal error in xdr: xdr_dtn_bundle_id_t");
00863 return DTN_EXDR;
00864 }
00865
00866 return DTN_SUCCESS;
00867 }
00868
00869
00870
00871 #define DTN_FILE_DELIVERY_BUF_SIZE 1000
00872
00873
00874 int
00875 APIClient::handle_recv()
00876 {
00877 dtn_bundle_spec_t spec;
00878 dtn_bundle_payload_t payload;
00879 dtn_bundle_payload_location_t location;
00880 dtn_bundle_status_report_t status_report;
00881 dtn_timeval_t timeout;
00882 oasys::ScratchBuffer<u_char*> buf;
00883 APIRegistration* reg = NULL;
00884 bool sock_ready = false;
00885 oasys::FileIOClient tmpfile;
00886
00887
00888 if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
00889 (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
00890 {
00891 log_err("error in xdr unpacking arguments");
00892 return DTN_EXDR;
00893 }
00894
00895 int err = wait_for_bundle("recv", timeout, ®, &sock_ready);
00896 if (err != 0) {
00897 return err;
00898 }
00899
00900
00901
00902
00903 if (sock_ready) {
00904 log_debug("handle_recv: api socket ready -- trying to read one byte");
00905 char b;
00906 if (read(&b, 1) != 0) {
00907 log_err("handle_recv: protocol error -- "
00908 "data arrived or error while blocked in recv");
00909 return DTN_ECOMM;
00910 }
00911
00912 log_info("IPC socket closed while blocked in read... "
00913 "application must have exited");
00914 return -1;
00915 }
00916
00917 BundleRef bref("APIClient::handle_recv");
00918 bref = reg->bundle_list()->pop_front();
00919 Bundle* b = bref.object();
00920 ASSERT(b != NULL);
00921
00922 log_debug("handle_recv: popped bundle %d for registration %d (timeout %d)",
00923 b->bundleid_, reg->regid(), timeout);
00924
00925 memset(&spec, 0, sizeof(spec));
00926 memset(&payload, 0, sizeof(payload));
00927 memset(&status_report, 0, sizeof(status_report));
00928
00929
00930
00931 b->source_.copyto(&spec.source);
00932 b->dest_.copyto(&spec.dest);
00933 b->replyto_.copyto(&spec.replyto);
00934
00935 spec.dopts = 0;
00936 if (b->custody_requested_) spec.dopts |= DOPTS_CUSTODY;
00937 if (b->delivery_rcpt_) spec.dopts |= DOPTS_DELIVERY_RCPT;
00938 if (b->receive_rcpt_) spec.dopts |= DOPTS_RECEIVE_RCPT;
00939 if (b->forward_rcpt_) spec.dopts |= DOPTS_FORWARD_RCPT;
00940 if (b->custody_rcpt_) spec.dopts |= DOPTS_CUSTODY_RCPT;
00941 if (b->deletion_rcpt_) spec.dopts |= DOPTS_DELETE_RCPT;
00942
00943 spec.expiration = b->expiration_;
00944
00945
00946
00947
00948 payload.location = location;
00949
00950 if (location == DTN_PAYLOAD_MEM) {
00951
00952
00953
00954 size_t payload_len = b->payload_.length();
00955 payload.buf.buf_len = payload_len;
00956 if (payload_len != 0) {
00957 buf.reserve(payload_len);
00958 payload.buf.buf_val =
00959 (char*)b->payload_.read_data(0, payload_len, buf.buf());
00960 } else {
00961 payload.buf.buf_val = 0;
00962 }
00963
00964 } else if (location == DTN_PAYLOAD_FILE) {
00965 char *tdir, templ[64];
00966
00967
00968
00969 tdir = getenv("TMP");
00970 if (tdir == NULL) {
00971 tdir = getenv("TEMP");
00972 }
00973 if (tdir == NULL) {
00974 tdir = "/tmp";
00975 }
00976
00977 snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);
00978
00979 if (tmpfile.mkstemp(templ) == -1) {
00980 log_err("can't open temporary file to deliver bundle");
00981 return DTN_EINTERNAL;
00982 }
00983
00984 if (b->payload_.location() == BundlePayload::MEMORY) {
00985 tmpfile.writeall((char*)b->payload_.memory_data(),
00986 b->payload_.length());
00987
00988 } else {
00989 b->payload_.copy_file(&tmpfile);
00990 }
00991
00992 payload.filename.filename_val = (char*)tmpfile.path();
00993 payload.filename.filename_len = tmpfile.path_len() + 1;
00994 tmpfile.close();
00995
00996 } else {
00997 log_err("payload location %d not understood", location);
00998 return DTN_EINVAL;
00999 }
01000
01001
01002
01003
01004
01005 BundleStatusReport::data_t sr_data;
01006 if (BundleStatusReport::parse_status_report(&sr_data, b))
01007 {
01008 payload.status_report = &status_report;
01009 sr_data.orig_source_eid_.copyto(&status_report.bundle_id.source);
01010 status_report.bundle_id.creation_ts.secs =
01011 sr_data.orig_creation_tv_.seconds_;
01012 status_report.bundle_id.creation_ts.seqno =
01013 sr_data.orig_creation_tv_.seqno_;
01014 status_report.bundle_id.frag_offset = sr_data.orig_frag_offset_;
01015 status_report.bundle_id.orig_length = sr_data.orig_frag_length_;
01016
01017 status_report.reason = (dtn_status_report_reason_t)sr_data.reason_code_;
01018 status_report.flags = (dtn_status_report_flags_t)sr_data.status_flags_;
01019 }
01020
01021 if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
01022 {
01023 log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
01024 return DTN_EXDR;
01025 }
01026
01027 if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
01028 {
01029 log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
01030 return DTN_EXDR;
01031 }
01032
01033
01034 payload.status_report = NULL;
01035
01036 log_info("DTN_RECV: "
01037 "successfully delivered bundle %d to registration %d",
01038 b->bundleid_, reg->regid());
01039
01040 BundleDaemon::post(new BundleDeliveredEvent(b, reg));
01041
01042 return DTN_SUCCESS;
01043 }
01044
01045
01046 int
01047 APIClient::handle_begin_poll()
01048 {
01049 dtn_timeval_t timeout;
01050 APIRegistration* reg = NULL;
01051 bool sock_ready = false;
01052
01053
01054 if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
01055 {
01056 log_err("error in xdr unpacking arguments");
01057 return DTN_EXDR;
01058 }
01059
01060 int err = wait_for_bundle("poll", timeout, ®, &sock_ready);
01061 if (err != 0) {
01062 return err;
01063 }
01064
01065
01066
01067 if (sock_ready) {
01068 log_debug("handle_begin_poll: "
01069 "api socket ready -- trying to read one byte");
01070 char type;
01071
01072 int ret = read(&type, 1);
01073 if (ret == 0) {
01074 log_info("IPC socket closed while blocked in read... "
01075 "application must have exited");
01076 return -1;
01077 }
01078
01079 if (ret == -1) {
01080 log_err("handle_begin_poll: protocol error -- "
01081 "error while blocked in poll");
01082 return DTN_ECOMM;
01083 }
01084
01085 if (type != DTN_CANCEL_POLL) {
01086 log_err("handle_poll: error got unexpected message '%s' "
01087 "while blocked in poll", dtnipc_msgtoa(type));
01088 return DTN_ECOMM;
01089 }
01090
01091
01092 u_int32_t len;
01093 ret = read((char*)&len, 4);
01094 if (ret != 4 || len != 0) {
01095 log_err("handle_begin_poll: protocol error -- "
01096 "error getting cancel poll length");
01097 return DTN_ECOMM;
01098 }
01099
01100
01101
01102
01103 send_response(DTN_SUCCESS);
01104 }
01105
01106 return DTN_SUCCESS;
01107 }
01108
01109
01110 int
01111 APIClient::handle_cancel_poll()
01112 {
01113
01114
01115
01116
01117
01118 return DTN_SUCCESS;
01119 }
01120
01121
01122 int
01123 APIClient::wait_for_bundle(const char* operation, dtn_timeval_t dtn_timeout,
01124 APIRegistration** regp, bool* sock_ready)
01125 {
01126 APIRegistration* reg;
01127
01128
01129
01130
01131
01132 if (bindings_->empty()) {
01133 log_err("wait_for_bundle(%s): no bound registration",
01134 operation);
01135 return DTN_EINVAL;
01136 }
01137
01138 reg = bindings_->front();
01139
01140
01141 if (reg->bundle_list()->size() != 0) {
01142 log_debug("wait_for_bundle(%s): "
01143 "immediately returning bundle for reg %d",
01144 operation, reg->regid());
01145 *regp = reg;
01146 return 0;
01147 }
01148
01149 int timeout = (int)dtn_timeout;
01150 if (timeout < -1) {
01151 log_err("wait_for_bundle(%s): "
01152 "invalid timeout value %d", operation, timeout);
01153 return DTN_EINVAL;
01154 }
01155
01156 struct pollfd pollfds[2];
01157
01158 struct pollfd* bundle_poll = &pollfds[0];
01159 bundle_poll->fd = reg->bundle_list()->notifier()->read_fd();
01160 bundle_poll->events = POLLIN;
01161 bundle_poll->revents = 0;
01162
01163 struct pollfd* sock_poll = &pollfds[1];
01164 sock_poll->fd = TCPClient::fd_;
01165 sock_poll->events = POLLIN | POLLERR;
01166 sock_poll->revents = 0;
01167
01168 log_debug("wait_for_bundle(%s): "
01169 "blocking to get bundle for registration %d (timeout %d)",
01170 operation, reg->regid(), timeout);
01171 int nready = oasys::IO::poll_multiple(&pollfds[0], 2, timeout,
01172 NULL, logpath_);
01173
01174 if (nready == oasys::IOTIMEOUT) {
01175 log_debug("wait_for_bundle(%s): timeout waiting for bundle",
01176 operation);
01177 return DTN_ETIMEOUT;
01178
01179 } else if (nready <= 0) {
01180 log_err("wait_for_bundle(%s): unexpected error polling for bundle",
01181 operation);
01182 return DTN_EINTERNAL;
01183 }
01184
01185 ASSERT(nready == 1);
01186
01187
01188
01189
01190 if (sock_poll->revents != 0) {
01191 *sock_ready = true;
01192 return 0;
01193 }
01194
01195
01196 if (bundle_poll->revents == 0) {
01197 log_crit("wait_for_bundle(%s): unexpected error polling for bundle: "
01198 "neither file descriptor is ready", operation);
01199 return DTN_EINTERNAL;
01200 }
01201
01202 if (reg->bundle_list()->size() == 0) {
01203 log_err("wait_for_bundle(%s): "
01204 "bundle list returned ready but no bundle on queue!!",
01205 operation);
01206 return DTN_EINTERNAL;
01207 }
01208
01209 *regp = reg;
01210 return 0;
01211 }
01212
01213
01214 int
01215 APIClient::handle_close()
01216 {
01217 log_info("received DTN_CLOSE message; closing API handle");
01218
01219 return -1;
01220 }
01221
01222 }