APIServer.cc

Go to the documentation of this file.
00001 /*
00002  * IMPORTANT: READ BEFORE DOWNLOADING, COPYING, INSTALLING OR USING. By
00003  * downloading, copying, installing or using the software you agree to
00004  * this license. If you do not agree to this license, do not download,
00005  * install, copy or use the software.
00006  * 
00007  * Intel Open Source License 
00008  * 
00009  * Copyright (c) 2004 Intel Corporation. All rights reserved. 
00010  * 
00011  * Redistribution and use in source and binary forms, with or without
00012  * modification, are permitted provided that the following conditions are
00013  * met:
00014  * 
00015  *   Redistributions of source code must retain the above copyright
00016  *   notice, this list of conditions and the following disclaimer.
00017  * 
00018  *   Redistributions in binary form must reproduce the above copyright
00019  *   notice, this list of conditions and the following disclaimer in the
00020  *   documentation and/or other materials provided with the distribution.
00021  * 
00022  *   Neither the name of the Intel Corporation nor the names of its
00023  *   contributors may be used to endorse or promote products derived from
00024  *   this software without specific prior written permission.
00025  *  
00026  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
00027  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
00028  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
00029  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE INTEL OR
00030  * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
00031  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
00032  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
00033  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
00034  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
00035  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
00036  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00037  */
00038 
00039 #include <sys/stat.h>
00040 #include <oasys/compat/inet_aton.h>
00041 #include <oasys/io/FileIOClient.h>
00042 #include <oasys/io/NetUtils.h>
00043 #include <oasys/util/Pointers.h>
00044 #include <oasys/util/ScratchBuffer.h>
00045 #include <oasys/util/XDRUtils.h>
00046 
00047 #include "APIServer.h"
00048 #include "bundling/Bundle.h"
00049 #include "bundling/BundleEvent.h"
00050 #include "bundling/BundleDaemon.h"
00051 #include "cmd/APICommand.h"
00052 #include "reg/APIRegistration.h"
00053 #include "reg/RegistrationTable.h"
00054 #include "routing/BundleRouter.h"
00055 #include "storage/GlobalStore.h"
00056 
00057 #ifndef MIN
00058 #define MIN(x, y) ((x)<(y) ? (x) : (y))
00059 #endif
00060 
00061 
00062 #ifdef __CYGWIN__
00063 // Cygwin's xdr.h file is k&r, so we need to make the declarations
00064 // more specific here to avoid errors when compiling with g++ instead
00065 // of gcc.
00066 
00067 extern "C" {
00068     extern void xdrmem_create(XDR *__xdrs, __const caddr_t __addr,
00069                               u_int __size, enum xdr_op __xop);
00070 }
00071 
00072 // these defines add a cast to change the function pointer for a function
00073 // with no args (which we get from xdr.h) into a function pointer with
00074 // args (i.e. k&r to ansi c).
00075 
00076 typedef void (*xdr_setpos_t)(XDR *, int);
00077 #undef xdr_setpos
00078 #define xdr_setpos(xdrs, pos) ((xdr_setpos_t)(*(xdrs)->x_ops->x_setpostn))(xdrs, pos)
00079 
00080 typedef int (*xdr_getpos_t)(XDR *);
00081 #undef xdr_getpos
00082 #define xdr_getpos(xdrs) ((xdr_getpos_t)(*(xdrs)->x_ops->x_getpostn))(xdrs)
00083 
00084 typedef int (*xdr_putlong_t)(XDR *, long *);
00085 #undef xdr_putlong
00086 #define xdr_putlong(xdrs, ptr) ((xdr_putlong_t)(*(xdrs)->x_ops->x_putlong))(xdrs, ptr)
00087 
00088 #endif
00089 
00090 namespace dtn {
00091 
00092 //----------------------------------------------------------------------
00093 APIServer::APIServer()
00094     : TCPServerThread("APIServer", "/dtn/apiserver", DELETE_ON_EXIT)
00095 {
00096     local_addr_ = htonl(INADDR_LOOPBACK);
00097     local_port_ = DTN_IPC_PORT;
00098 
00099     // override the defaults via environment variables, if given
00100     char *env;
00101     if ((env = getenv("DTNAPI_ADDR")) != NULL) {
00102         if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
00103         {
00104             log_err("DTNAPI_ADDR environment variable (%s) "
00105                     "not a valid ip address, using default of localhost",
00106                     env);
00107             // in case inet_aton touched it
00108             local_addr_ = htonl(INADDR_LOOPBACK);
00109         } else {
00110             log_debug("local address set to %s by DTNAPI_ADDR "
00111                       "environment variable", env);
00112         }
00113     }
00114 
00115     if ((env = getenv("DTNAPI_PORT")) != NULL) {
00116         char *end;
00117         u_int port = strtoul(env, &end, 10);
00118         if (*end != '\0' || port > 0xffff)
00119         {
00120             log_err("DTNAPI_PORT environment variable (%s) "
00121                     "not a valid ip port, using default of %d",
00122                     env, DTN_IPC_PORT);
00123             port = DTN_IPC_PORT;
00124         } else {
00125             log_debug("api port set to %s by DTNAPI_PORT "
00126                       "environment variable", env);
00127         }
00128         local_port_ = (u_int16_t)port;
00129     }
00130 
00131     if (local_addr_ != INADDR_ANY || local_port_ != 0) {
00132         log_debug("APIServer init (evironment set addr %s port %d)",
00133                   intoa(local_addr_), local_port_);
00134     } else {
00135         log_debug("APIServer init");
00136     }
00137 
00138     oasys::TclCommandInterp::instance()->reg(new APICommand(this));
00139 }
00140 
00141 //----------------------------------------------------------------------
00142 void
00143 APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
00144 {
00145     APIClient* c = new APIClient(fd, addr, port);
00146     c->start();
00147 }
00148 
00149 //----------------------------------------------------------------------
00150 APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port)
00151     : Thread("APIClient", DELETE_ON_EXIT),
00152       TCPClient(fd, addr, port, "/dtn/apiclient"),
00153       notifier_(logpath_)
00154 {
00155     // note that we skip space for the message length and code/status
00156     xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
00157     xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);
00158 
00159     bindings_ = new APIRegistrationList();
00160 }
00161 
00162 //----------------------------------------------------------------------
00163 APIClient::~APIClient()
00164 {
00165     log_debug("client destroyed");
00166     delete_z(bindings_);
00167 }
00168 
00169 //----------------------------------------------------------------------
00170 void
00171 APIClient::close_session()
00172 {
00173     TCPClient::close();
00174 
00175     APIRegistration* reg;
00176     while (! bindings_->empty()) {
00177         reg = bindings_->front();
00178         bindings_->pop_front();
00179         
00180         reg->set_active(false);
00181 
00182         if (reg->expired()) {
00183             log_debug("removing expired registration %d", reg->regid());
00184             BundleDaemon::post(new RegistrationExpiredEvent(reg->regid()));
00185         }
00186     }
00187 }
00188 
00189 //----------------------------------------------------------------------
00190 int
00191 APIClient::handle_handshake()
00192 {
00193     u_int32_t handshake;
00194     u_int16_t message_type, ipc_version;
00195     
00196     int ret = readall((char*)&handshake, sizeof(handshake));
00197     if (ret != sizeof(handshake)) {
00198         log_err("error reading handshake: (got %d/%zu), \"error\" %s",
00199                 ret, sizeof(handshake), strerror(errno));
00200         return -1;
00201     }
00202 
00203     message_type = ntohl(handshake) >> 16;
00204     ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);
00205 
00206     if (message_type != DTN_OPEN) {
00207         log_err("handshake (%d)'s message type %d != DTN_OPEN (%d)",
00208                 handshake, message_type, DTN_OPEN);
00209         return -1;
00210     }
00211     
00212     if (ipc_version != DTN_IPC_VERSION) {
00213         log_err("handshake (%d)'s version %d != DTN_IPC_VERSION (%d)",
00214                 handshake, ipc_version, DTN_IPC_VERSION);
00215         return -1;
00216     }
00217     
00218     ret = writeall((char*)&handshake, sizeof(handshake));
00219     if (ret != sizeof(handshake)) {
00220         log_err("error writing handshake: %s", strerror(errno));
00221         return -1;
00222     }
00223 
00224     return 0;
00225 }
00226 
00227 //----------------------------------------------------------------------
00228 void
00229 APIClient::run()
00230 {
00231     int ret;
00232     u_int8_t type;
00233     u_int32_t len;
00234     
00235     log_info("new session %s:%d -> %s:%d",
00236              intoa(local_addr()), local_port(),
00237              intoa(remote_addr()), remote_port());
00238 
00239     if (handle_handshake() != 0) {
00240         close_session();
00241         return;
00242     }
00243     
00244     while (true) {
00245         xdr_setpos(&xdr_encode_, 0);
00246         xdr_setpos(&xdr_decode_, 0);
00247 
00248         // read the incoming message into the fourth byte of the
00249         // buffer, since the typecode + message length is only five
00250         // bytes long, but the XDR engines are set to point at the
00251         // eighth byte of the buffer
00252         ret = read(&buf_[3], DTN_MAX_API_MSG);
00253             
00254         if (ret <= 0) {
00255             log_warn("client error or disconnection");
00256             close_session();
00257             return;
00258         }
00259         
00260         if (ret < 5) {
00261             log_err("ack!! can't handle really short read...");
00262             close_session();
00263             return;
00264         }
00265 
00266         // NOTE: this protocol is duplicated in the implementation of
00267         // handle_begin_poll to take care of a cancel_poll request
00268         // coming in while the thread is waiting for bundles so any
00269         // modifications must be propagated there
00270         type = buf_[3];
00271         memcpy(&len, &buf_[4], sizeof(len));
00272 
00273         len = ntohl(len);
00274 
00275         ret -= 5;
00276         log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);
00277 
00278         // if we didn't get the whole message, loop to get the rest,
00279         // skipping the header bytes and the already-read amount
00280         if (ret < (int)len) {
00281             int toget = len - ret;
00282             if (readall(&buf_[8 + ret], toget) != toget) {
00283                 log_err("error reading message remainder: %s",
00284                         strerror(errno));
00285                 close_session();
00286                 return;
00287             }
00288         }
00289 
00290         // dispatch to the handler routine
00291         switch(type) {
00292 #define DISPATCH(_type, _fn)                    \
00293         case _type:                             \
00294             ret = _fn();                        \
00295             break;
00296             
00297             DISPATCH(DTN_LOCAL_EID,         handle_local_eid);
00298             DISPATCH(DTN_REGISTER,          handle_register);
00299             DISPATCH(DTN_UNREGISTER,        handle_unregister);
00300             DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
00301             DISPATCH(DTN_SEND,              handle_send);
00302             DISPATCH(DTN_BIND,              handle_bind);
00303             DISPATCH(DTN_UNBIND,            handle_unbind);
00304             DISPATCH(DTN_RECV,              handle_recv);
00305             DISPATCH(DTN_BEGIN_POLL,        handle_begin_poll);
00306             DISPATCH(DTN_CANCEL_POLL,       handle_cancel_poll);
00307             DISPATCH(DTN_CLOSE,             handle_close);
00308 #undef DISPATCH
00309 
00310         default:
00311             log_err("unknown message type code 0x%x", type);
00312             ret = DTN_EMSGTYPE;
00313             break;
00314         }
00315 
00316         // if the handler returned -1, then the session should be
00317         // immediately terminated
00318         if (ret == -1) {
00319             close_session();
00320             return;
00321         }
00322         
00323         // send the response
00324         if (send_response(ret) != 0) {
00325             return;
00326         }
00327         // if there was an IPC communication error or unknown message
00328         // type, close terminate the session
00329         // XXX/matt we could potentially close on all errors, not just these 2
00330         if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
00331             close_session();
00332             return;
00333         }
00334         
00335     } // while(1)
00336 }
00337 
00338 //----------------------------------------------------------------------
00339 int
00340 APIClient::send_response(int ret)
00341 {
00342     u_int32_t len, msglen;
00343     
00344     // make sure the dispatched function returned a valid error
00345     // code
00346     ASSERT(ret == DTN_SUCCESS ||
00347            (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
00348         
00349     // fill in the reply message with the status code and the
00350     // length of the reply. note that if there is no reply, then
00351     // the xdr position should still be zero
00352     len = xdr_getpos(&xdr_encode_);
00353     log_debug("building reply: status %s, length %d",
00354               dtnipc_msgtoa(ret), len);
00355 
00356     msglen = len + 8;
00357     ret = ntohl(ret);
00358     len = htonl(len);
00359 
00360     memcpy(buf_,     &ret, sizeof(ret));
00361     memcpy(&buf_[4], &len, sizeof(len));
00362 
00363     log_debug("sending %d byte reply message", msglen);
00364     if (writeall(buf_, msglen) != (int)msglen) {
00365         log_err("error sending reply: %s", strerror(errno));
00366         close_session();
00367         return -1;
00368     }
00369 
00370     return 0;
00371 }
00372         
00373 //----------------------------------------------------------------------
00374 bool
00375 APIClient::is_bound(u_int32_t regid)
00376 {
00377     APIRegistrationList::iterator iter;
00378     for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00379         if ((*iter)->regid() == regid) {
00380             return true;
00381         }
00382     }
00383 
00384     return false;
00385 }
00386 
00387 //----------------------------------------------------------------------
00388 int
00389 APIClient::handle_local_eid()
00390 {
00391     dtn_service_tag_t service_tag;
00392     dtn_endpoint_id_t local_eid;
00393     
00394     // unpack the request
00395     if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
00396     {
00397         log_err("error in xdr unpacking arguments");
00398         return DTN_EXDR;
00399     }
00400 
00401     // build up the response
00402     EndpointID eid(BundleDaemon::instance()->local_eid());
00403     if (eid.append_service_tag(service_tag.tag) == false) {
00404         log_err("error appending service tag");
00405         return DTN_EINVAL;
00406     }
00407 
00408     memset(&local_eid, 0, sizeof(local_eid));
00409     eid.copyto(&local_eid);
00410     
00411     // pack the response
00412     if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
00413         log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
00414         return DTN_EXDR;
00415     }
00416 
00417     log_debug("get_local_eid encoded %d byte response",
00418               xdr_getpos(&xdr_encode_));
00419     
00420     return DTN_SUCCESS;
00421 }
00422 
00423 //----------------------------------------------------------------------
00424 int
00425 APIClient::handle_register()
00426 {
00427     APIRegistration* reg;
00428     Registration::failure_action_t action;
00429     EndpointIDPattern endpoint;
00430     std::string script;
00431     
00432     dtn_reg_info_t reginfo;
00433 
00434     memset(&reginfo, 0, sizeof(reginfo));
00435     
00436     // unpack and parse the request
00437     if (!xdr_dtn_reg_info_t(&xdr_decode_, &reginfo))
00438     {
00439         log_err("error in xdr unpacking arguments");
00440         return DTN_EXDR;
00441     }
00442 
00443     // make sure we free any dynamically-allocated bits in the
00444     // incoming structure before we exit the proc
00445     oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)&reginfo);
00446     
00447     endpoint.assign(&reginfo.endpoint);
00448 
00449     if (!endpoint.valid()) {
00450         log_err("invalid endpoint id in register: '%s'",
00451                 reginfo.endpoint.uri);
00452         return DTN_EINVAL;
00453     }
00454     
00455     switch (reginfo.failure_action) {
00456     case DTN_REG_DEFER: action = Registration::DEFER; break;
00457     case DTN_REG_DROP:  action = Registration::DROP;  break;
00458     case DTN_REG_EXEC:  action = Registration::EXEC;  break;
00459     default: {
00460         log_err("invalid failure action code 0x%x", reginfo.failure_action);
00461         return DTN_EINVAL;
00462     }
00463     }
00464 
00465     if (action == Registration::EXEC) {
00466         script.assign(reginfo.script.script_val, reginfo.script.script_len);
00467     }
00468 
00469     u_int32_t regid = GlobalStore::instance()->next_regid();
00470     reg = new APIRegistration(regid, endpoint, action,
00471                               reginfo.expiration, script);
00472 
00473     if (! reginfo.init_passive) {
00474         // XXX/demmer fixme to allow multiple registrations
00475         if (! bindings_->empty()) {
00476             log_err("error: handle already bound to a registration");
00477             return DTN_EBUSY;
00478         }
00479         
00480         // store the registration in the list for this session
00481         bindings_->push_back(reg);
00482         reg->set_active(true);
00483     }
00484     
00485     BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
00486                                 &notifier_);
00487     
00488     // fill the response with the new registration id
00489     if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
00490         log_err("internal error in xdr: xdr_dtn_reg_id_t");
00491         return DTN_EXDR;
00492     }
00493     
00494     return DTN_SUCCESS;
00495 }
00496 
00497 //----------------------------------------------------------------------
00498 int
00499 APIClient::handle_unregister()
00500 {
00501     Registration* reg;
00502     dtn_reg_id_t regid;
00503     
00504     // unpack and parse the request
00505     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid))
00506     {
00507         log_err("error in xdr unpacking arguments");
00508         return DTN_EXDR;
00509     }
00510 
00511     reg = BundleDaemon::instance()->reg_table()->get(regid);
00512     if (reg == NULL) {
00513         return DTN_ENOTFOUND;
00514     }
00515 
00516     // handle the special case in which we're unregistering a
00517     // currently bound registration, in which we actually leave it
00518     // around in the expired state, soit will be cleaned up when the
00519     // application either calls dtn_unbind() or closes the api socket
00520     if (is_bound(reg->regid()) && reg->active()) {
00521         if (reg->expired()) {
00522             return DTN_EINVAL;
00523         }
00524         
00525         reg->force_expire();
00526         ASSERT(reg->expired());
00527         return DTN_SUCCESS;
00528     }
00529 
00530     // otherwise it's an error to call unregister on a registration
00531     // that's in-use by someone else
00532     if (reg->active()) {
00533         return DTN_EBUSY;
00534     }
00535 
00536     BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
00537                                 &notifier_);
00538     
00539     return DTN_SUCCESS;
00540 }
00541 
00542 //----------------------------------------------------------------------
00543 int
00544 APIClient::handle_find_registration()
00545 {
00546     Registration* reg;
00547     EndpointIDPattern endpoint;
00548     dtn_endpoint_id_t app_eid;
00549 
00550     // unpack and parse the request
00551     if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
00552     {
00553         log_err("error in xdr unpacking arguments");
00554         return DTN_EXDR;
00555     }
00556 
00557     endpoint.assign(&app_eid);
00558     if (!endpoint.valid()) {
00559         log_err("invalid endpoint id in find_registration: '%s'",
00560                 app_eid.uri);
00561         return DTN_EINVAL;
00562     }
00563 
00564     reg = BundleDaemon::instance()->reg_table()->get(endpoint);
00565     if (reg == NULL) {
00566         return DTN_ENOTFOUND;
00567     }
00568 
00569     u_int32_t regid = reg->regid();
00570     
00571     // fill the response with the new registration id
00572     if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
00573         log_err("internal error in xdr: xdr_dtn_reg_id_t");
00574         return DTN_EXDR;
00575     }
00576     
00577     return DTN_SUCCESS;
00578 }
00579 
00580 //----------------------------------------------------------------------
00581 int
00582 APIClient::handle_bind()
00583 {
00584     dtn_reg_id_t regid;
00585 
00586     // unpack the request
00587     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
00588         log_err("error in xdr unpacking arguments");
00589         return DTN_EXDR;
00590     }
00591 
00592     // look up the registration
00593     const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00594     Registration* reg = regtable->get(regid);
00595 
00596     if (!reg) {
00597         log_err("can't find registration %d", regid);
00598         return DTN_ENOTFOUND;
00599     }
00600 
00601     APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00602     if (api_reg == NULL) {
00603         log_crit("registration %d is not an API registration!!",
00604                  regid);
00605         return DTN_ENOTFOUND;
00606     }
00607 
00608     if (api_reg->active()) {
00609         log_err("registration %d is already in active mode", regid);
00610         return DTN_EBUSY;
00611     }
00612 
00613     // XXX/demmer fixme to allow multiple registrations
00614     if (! bindings_->empty()) {
00615         log_err("error: handle already bound to a registration");
00616         return DTN_EBUSY;
00617     }
00618     
00619     // store the registration in the list for this session
00620     bindings_->push_back(api_reg);
00621     api_reg->set_active(true);
00622 
00623     log_info("DTN_BIND: bound to registration %d", reg->regid());
00624     
00625     return DTN_SUCCESS;
00626 }
00627     
00628 //----------------------------------------------------------------------
00629 int
00630 APIClient::handle_unbind()
00631 {
00632     dtn_reg_id_t regid;
00633 
00634     // unpack the request
00635     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
00636         log_err("error in xdr unpacking arguments");
00637         return DTN_EXDR;
00638     }
00639 
00640     // look up the registration
00641     const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
00642     Registration* reg = regtable->get(regid);
00643 
00644     if (!reg) {
00645         log_err("can't find registration %d", regid);
00646         return DTN_ENOTFOUND;
00647     }
00648 
00649     APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
00650     if (api_reg == NULL) {
00651         log_crit("registration %d is not an API registration!!",
00652                  regid);
00653         return DTN_ENOTFOUND;
00654     }
00655 
00656     APIRegistrationList::iterator iter;
00657     for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
00658         if (*iter == api_reg) {
00659             bindings_->erase(iter);
00660             ASSERT(api_reg->active());
00661             api_reg->set_active(false);
00662 
00663             log_info("DTN_UNBIND: unbound from registration %d", regid);
00664             return DTN_SUCCESS;
00665         }
00666     }
00667 
00668     log_err("registration %d not bound to this api client", regid);
00669     return DTN_ENOTFOUND;
00670 }
00671     
00672 //----------------------------------------------------------------------
00673 int
00674 APIClient::handle_send()
00675 {
00676     Bundle* b;
00677     dtn_bundle_spec_t spec;
00678     dtn_bundle_payload_t payload;
00679 
00680     memset(&spec, 0, sizeof(spec));
00681     memset(&payload, 0, sizeof(payload));
00682     
00683     /* Unpack the arguments */
00684     if (!xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
00685         !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
00686     {
00687         log_err("error in xdr unpacking arguments");
00688         return DTN_EXDR;
00689     }
00690 
00691     b = new Bundle();
00692 
00693     // make sure the xdr unpacked call to malloc is cleaned up
00694     oasys::ScopeMalloc m((payload.location == DTN_PAYLOAD_MEM) ?
00695                          payload.dtn_bundle_payload_t_u.buf.buf_val :
00696                          payload.dtn_bundle_payload_t_u.filename.filename_val);
00697     
00698     // assign the addressing fields
00699     b->source_.assign(&spec.source);
00700     b->dest_.assign(&spec.dest);
00701     if (spec.replyto.uri[0] == '\0') {
00702         b->replyto_.assign(EndpointID::NULL_EID());
00703     } else {
00704         b->replyto_.assign(&spec.replyto);
00705     }
00706     b->custodian_.assign(EndpointID::NULL_EID());
00707      
00708     oasys::StringBuffer error;
00709     if (!b->validate(&error)) {
00710         log_err("bundle validation failed: %s", error.data());
00711         return DTN_EINVAL;
00712     }
00713     
00714     // the priority code
00715     switch (spec.priority) {
00716 #define COS(_cos) case _cos: b->priority_ = Bundle::_cos; break;
00717         COS(COS_BULK);
00718         COS(COS_NORMAL);
00719         COS(COS_EXPEDITED);
00720         COS(COS_RESERVED);
00721 #undef COS
00722     default:
00723         log_err("invalid priority level %d", (int)spec.priority);
00724         return DTN_EINVAL;
00725     };
00726 
00727     // delivery options
00728     if (spec.dopts & DOPTS_CUSTODY)
00729         b->custody_requested_ = true;
00730     
00731     if (spec.dopts & DOPTS_DELIVERY_RCPT)
00732         b->delivery_rcpt_ = true;
00733 
00734     if (spec.dopts & DOPTS_RECEIVE_RCPT)
00735         b->receive_rcpt_ = true;
00736 
00737     if (spec.dopts & DOPTS_FORWARD_RCPT)
00738         b->forward_rcpt_ = true;
00739 
00740     if (spec.dopts & DOPTS_CUSTODY_RCPT)
00741         b->custody_rcpt_ = true;
00742 
00743     if (spec.dopts & DOPTS_DELETE_RCPT)
00744         b->deletion_rcpt_ = true;
00745 
00746     // expiration time
00747     b->expiration_ = spec.expiration;
00748 
00749     // finally, the payload
00750     size_t payload_len;
00751     switch ( payload.location ) {
00752     case DTN_PAYLOAD_MEM:
00753         b->payload_.set_data((u_char*)payload.dtn_bundle_payload_t_u.buf.buf_val,
00754                              payload.dtn_bundle_payload_t_u.buf.buf_len);
00755         payload_len = payload.dtn_bundle_payload_t_u.buf.buf_len;
00756         break;
00757         
00758     case DTN_PAYLOAD_FILE:
00759         char filename[PATH_MAX];
00760         FILE * file;
00761         struct stat finfo;
00762         int r, left;
00763         u_char buffer[4096];
00764 
00765         sprintf(filename, "%.*s", 
00766                 (int)payload.dtn_bundle_payload_t_u.filename.filename_len,
00767                 payload.dtn_bundle_payload_t_u.filename.filename_val);
00768 
00769         if (stat(filename, &finfo) || (file = fopen(filename, "r")) == NULL)
00770         {
00771             log_err("payload file %s does not exist!", filename);
00772             return DTN_EINVAL;
00773         }
00774         
00775         payload_len = finfo.st_size;
00776         b->payload_.set_length(payload_len);
00777 
00778         b->payload_.reopen_file();
00779         left = payload_len;
00780         r = 0;
00781         while (left > 0)
00782         {
00783             r = fread(buffer, 1, (left>4096)?4096:left, file);
00784             
00785             if (r)
00786             {
00787                 b->payload_.append_data(buffer, r);
00788                 left -= r;
00789             }
00790             else
00791             {
00792                 sleep(1); // pause before re-reading
00793             }
00794         }
00795 
00796         fclose(file);
00797         b->payload_.close_file();
00798         break;
00799         
00800     default:
00801         log_err("payload.location of %d unknown", payload.location);
00802         return DTN_EINVAL;
00803     }
00804 
00805     //  before posting the received event, fill in the bundle id struct
00806     dtn_bundle_id_t id;
00807     memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
00808     id.creation_secs    = b->creation_ts_.seconds_;
00809     id.creation_subsecs = b->creation_ts_.seqno_;
00810     
00811     log_info("DTN_SEND bundle *%p", b);
00812     
00813     // deliver the bundle
00814     // Note: the bundle state may change once it has been posted
00815     BundleDaemon::post_and_wait(new BundleReceivedEvent(b, EVENTSRC_APP),
00816                                 &notifier_);
00817 
00818     // return the bundle id struct
00819     if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
00820         log_err("internal error in xdr: xdr_dtn_bundle_id_t");
00821         return DTN_EXDR;
00822     }
00823     
00824     return DTN_SUCCESS;
00825 }
00826 
00827 // Size for temporary memory buffer used when delivering bundles
00828 // via files.
00829 #define DTN_FILE_DELIVERY_BUF_SIZE 1000
00830 
00831 //----------------------------------------------------------------------
00832 int
00833 APIClient::handle_recv()
00834 {
00835     dtn_bundle_spec_t             spec;
00836     dtn_bundle_payload_t          payload;
00837     dtn_bundle_payload_location_t location;
00838     dtn_timeval_t                 timeout;
00839     oasys::ScratchBuffer<u_char*> buf;
00840     APIRegistration*              reg = NULL;
00841     bool                          sock_ready = false;
00842 
00843     // unpack the arguments
00844     if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
00845         (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
00846     {
00847         log_err("error in xdr unpacking arguments");
00848         return DTN_EXDR;
00849     }
00850     
00851     int err = wait_for_bundle("recv", timeout, &reg, &sock_ready);
00852     if (err != 0) {
00853         return err;
00854     }
00855     
00856     // if there's data on the socket, that either means the socket was
00857     // closed by an exiting application or the app is violating the
00858     // protocol...
00859     if (sock_ready) {
00860         log_debug("handle_recv: api socket ready -- trying to read one byte");
00861         char b;
00862         if (read(&b, 1) != 0) {
00863             log_err("handle_recv: protocol error -- "
00864                     "data arrived or error while blocked in recv");
00865             return DTN_ECOMM;
00866         }
00867 
00868         log_info("IPC socket closed while blocked in read... "
00869                  "application must have exited");
00870         return -1;
00871     }
00872     
00873     BundleRef bref("APIClient::handle_recv");
00874     bref = reg->bundle_list()->pop_front();
00875     Bundle* b = bref.object();
00876     ASSERT(b != NULL);
00877     
00878     log_debug("handle_recv: popped bundle %d for registration %d (timeout %d)",
00879               b->bundleid_, reg->regid(), timeout);
00880     
00881     memset(&spec, 0, sizeof(spec));
00882     memset(&payload, 0, sizeof(payload));
00883 
00884     // copyto will malloc string buffer space that needs to be freed
00885     // at the end of the fn
00886     b->source_.copyto(&spec.source);
00887     b->dest_.copyto(&spec.dest);
00888     b->replyto_.copyto(&spec.replyto);
00889 
00890     // XXX/demmer copy delivery options and class of service fields
00891 
00892     // XXX/demmer verify bundle size constraints
00893     payload.location = location;
00894     
00895     if (location == DTN_PAYLOAD_MEM) {
00896         // the app wants the payload in memory
00897         // XXX/demmer verify bounds
00898 
00899         size_t payload_len = b->payload_.length();
00900         payload.dtn_bundle_payload_t_u.buf.buf_len = payload_len;
00901         if (payload_len != 0) {
00902             buf.reserve(payload_len);
00903             payload.dtn_bundle_payload_t_u.buf.buf_val =
00904                 (char*)b->payload_.read_data(0, payload_len, buf.buf());
00905         } else {
00906             payload.dtn_bundle_payload_t_u.buf.buf_val = 0;
00907         }
00908         
00909     } else if (location == DTN_PAYLOAD_FILE) {
00910         oasys::FileIOClient tmpfile;
00911         char *tdir, templ[64];
00912 
00913         tdir = getenv("TMP");
00914         if (tdir == NULL) {
00915             tdir = getenv("TEMP");
00916         }
00917         if (tdir == NULL) {
00918             tdir = "/tmp";
00919         }
00920 
00921         snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);
00922 
00923         if (tmpfile.mkstemp(templ) == -1) {
00924             log_err("can't open temporary file to deliver bundle");
00925             return DTN_EINTERNAL;
00926         }
00927         
00928         if (b->payload_.location() == BundlePayload::MEMORY) {
00929             tmpfile.writeall((char*)b->payload_.memory_data(),
00930                              b->payload_.length());
00931             
00932         } else {
00933             b->payload_.copy_file(&tmpfile);
00934         }
00935 
00936         payload.dtn_bundle_payload_t_u.filename.filename_val = (char*)tmpfile.path();
00937         payload.dtn_bundle_payload_t_u.filename.filename_len = tmpfile.path_len();
00938         tmpfile.close();
00939         
00940     } else {
00941         log_err("payload location %d not understood", location);
00942         return DTN_EINVAL;
00943     }
00944 
00945     if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
00946     {
00947         log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
00948         return DTN_EXDR;
00949     }
00950     
00951     if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
00952     {
00953         log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
00954         return DTN_EXDR;
00955     }
00956     
00957     log_info("DTN_RECV: "
00958              "successfully delivered bundle %d to registration %d",
00959              b->bundleid_, reg->regid());
00960     
00961     BundleDaemon::post(new BundleDeliveredEvent(b, reg));
00962 
00963     return DTN_SUCCESS;
00964 }
00965 
00966 //----------------------------------------------------------------------
00967 int
00968 APIClient::handle_begin_poll()
00969 {
00970     dtn_timeval_t    timeout;
00971     APIRegistration* reg = NULL;
00972     bool             sock_ready = false;
00973     
00974     // unpack the arguments
00975     if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
00976     {
00977         log_err("error in xdr unpacking arguments");
00978         return DTN_EXDR;
00979     }
00980 
00981     int err = wait_for_bundle("poll", timeout, &reg, &sock_ready);
00982     if (err != 0) {
00983         return err;
00984     }
00985 
00986     // if there's data on the socket, then the application either quit
00987     // and closed the socket, or called dtn_poll_cancel
00988     if (sock_ready) {
00989         log_debug("handle_begin_poll: "
00990                   "api socket ready -- trying to read one byte");
00991         char type;
00992         
00993         int ret = read(&type, 1);
00994         if (ret == 0) {
00995             log_info("IPC socket closed while blocked in read... "
00996                      "application must have exited");
00997             return -1;
00998         }
00999 
01000         if (ret == -1) {
01001             log_err("handle_begin_poll: protocol error -- "
01002                     "error while blocked in poll");
01003             return DTN_ECOMM;
01004         }
01005 
01006         if (type != DTN_CANCEL_POLL) {
01007             log_err("handle_poll: error got unexpected message '%s' "
01008                     "while blocked in poll", dtnipc_msgtoa(type));
01009             return DTN_ECOMM;
01010         }
01011 
01012         // read in the length which must be zero
01013         u_int32_t len;
01014         ret = read((char*)&len, 4);
01015         if (ret != 4 || len != 0) {
01016             log_err("handle_begin_poll: protocol error -- "
01017                     "error getting cancel poll length");
01018             return DTN_ECOMM;
01019         }
01020 
01021         // immediately send the response to the poll cancel, then
01022         // we return from the handler which will follow it with the
01023         // response code to the original poll request
01024         send_response(DTN_SUCCESS);
01025     }
01026     
01027     return DTN_SUCCESS;
01028 }
01029 
01030 //----------------------------------------------------------------------
01031 int
01032 APIClient::handle_cancel_poll()
01033 {
01034     // the only reason we should get in here is if the call to
01035     // dtn_begin_poll() returned but the app still called cancel_poll
01036     // and so the messages crossed. but, since there's nothing wrong
01037     // with this, we just return success in both cases
01038     
01039     return DTN_SUCCESS;
01040 }
01041         
01042 //----------------------------------------------------------------------
01043 int
01044 APIClient::wait_for_bundle(const char* operation, dtn_timeval_t dtn_timeout,
01045                            APIRegistration** regp, bool* sock_ready)
01046 {
01047     APIRegistration* reg;
01048     
01049     // XXX/demmer implement this for multiple registrations by
01050     // building up a poll vector here. for now we assert in bind that
01051     // there's only one binding.
01052     
01053     if (bindings_->empty()) {
01054         log_err("wait_for_bundle(%s): no bound registration", 
01055                 operation);
01056         return DTN_EINVAL;
01057     }
01058     
01059     reg = bindings_->front();
01060 
01061     // short-circuit the poll
01062     if (reg->bundle_list()->size() != 0) {
01063         log_debug("wait_for_bundle(%s): "
01064                   "immediately returning bundle for reg %d",
01065                   operation, reg->regid());
01066         *regp = reg;
01067         return 0;
01068     }
01069 
01070     int timeout = (int)dtn_timeout;
01071     if (timeout < -1) {
01072         log_err("wait_for_bundle(%s): "
01073                 "invalid timeout value %d", operation, timeout);
01074         return DTN_EINVAL;
01075     }
01076 
01077     struct pollfd pollfds[2];
01078 
01079     struct pollfd* bundle_poll = &pollfds[0];
01080     bundle_poll->fd            = reg->bundle_list()->notifier()->read_fd();
01081     bundle_poll->events        = POLLIN;
01082     bundle_poll->revents       = 0;
01083     
01084     struct pollfd* sock_poll   = &pollfds[1];
01085     sock_poll->fd              = TCPClient::fd_;
01086     sock_poll->events          = POLLIN | POLLERR;
01087     sock_poll->revents         = 0;
01088 
01089     log_debug("wait_for_bundle(%s): "
01090               "blocking to get bundle for registration %d (timeout %d)",
01091               operation, reg->regid(), timeout);
01092     int nready = oasys::IO::poll_multiple(&pollfds[0], 2, timeout,
01093                                           NULL, logpath_);
01094 
01095     if (nready == oasys::IOTIMEOUT) {
01096         log_debug("wait_for_bundle(%s): timeout waiting for bundle",
01097                   operation);
01098         return DTN_ETIMEOUT;
01099 
01100     } else if (nready <= 0) {
01101         log_err("wait_for_bundle(%s): unexpected error polling for bundle",
01102                 operation);
01103         return DTN_EINTERNAL;
01104     }
01105 
01106     ASSERT(nready == 1);
01107     
01108     // if there's data on the socket, that either means the socket was
01109     // closed by an exiting application or the app is violating the
01110     // protocol...
01111     if (sock_poll->revents != 0) {
01112         *sock_ready = true;
01113         return 0;
01114     }
01115 
01116     // otherwise, there should be data on the bundle list
01117     if (bundle_poll->revents == 0) {
01118         log_crit("wait_for_bundle(%s): unexpected error polling for bundle: "
01119                  "neither file descriptor is ready", operation);
01120         return DTN_EINTERNAL;
01121     }
01122 
01123     if (reg->bundle_list()->size() == 0) {
01124         log_err("wait_for_bundle(%s): "
01125                 "bundle list returned ready but no bundle on queue!!",
01126                 operation);
01127         return DTN_EINTERNAL;
01128     }
01129 
01130     *regp = reg;
01131     return 0;
01132 }
01133 
01134 //----------------------------------------------------------------------
01135 int
01136 APIClient::handle_close()
01137 {
01138     log_info("received DTN_CLOSE message; closing API handle");
01139     // return -1 to force the session to close:
01140     return -1;
01141 }
01142 
01143 } // namespace dtn

Generated on Fri Dec 22 14:47:57 2006 for DTN Reference Implementation by  doxygen 1.5.1