00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017 #ifndef _STREAM_CONVERGENCE_LAYER_H_
00018 #define _STREAM_CONVERGENCE_LAYER_H_
00019
00020 #include "ConnectionConvergenceLayer.h"
00021 #include "CLConnection.h"
00022
00023 namespace dtn {
00024
00079 class StreamConvergenceLayer : public ConnectionConvergenceLayer {
00080 public:
00084 StreamConvergenceLayer(const char* logpath, const char* cl_name,
00085 u_int8_t cl_version);
00086
00087 protected:
00091 typedef enum {
00092 SEGMENT_ACK_ENABLED = 1 << 0,
00093 REACTIVE_FRAG_ENABLED = 1 << 1,
00094 NEGATIVE_ACK_ENABLED = 1 << 2,
00095 } contact_header_flags_t;
00096
00100 struct ContactHeader {
00101 u_int32_t magic;
00102 u_int8_t version;
00103 u_int8_t flags;
00104 u_int16_t keepalive_interval;
00105
00106
00107 } __attribute__((packed));
00108
00114 typedef enum {
00115 DATA_SEGMENT = 0x1 << 4,
00116
00117 ACK_SEGMENT = 0x2 << 4,
00118
00119 REFUSE_BUNDLE = 0x3 << 4,
00120 KEEPALIVE = 0x4 << 4,
00121 SHUTDOWN = 0x5 << 4,
00122 } msg_type_t;
00123
00127 typedef enum {
00128 BUNDLE_START = 0x1 << 1,
00129 BUNDLE_END = 0x1 << 0,
00130 } data_segment_flags_t;
00131
00135 typedef enum {
00136 SHUTDOWN_HAS_REASON = 0x1 << 1,
00137 SHUTDOWN_HAS_DELAY = 0x1 << 0,
00138 } shutdown_flags_t;
00139
00143 typedef enum {
00144 SHUTDOWN_NO_REASON = 0xff,
00145 SHUTDOWN_IDLE_TIMEOUT = 0x0,
00146 SHUTDOWN_VERSION_MISMATCH = 0x1,
00147 SHUTDOWN_BUSY = 0x2,
00148 } shutdown_reason_t;
00149
00153 static const char* shutdown_reason_to_str(shutdown_reason_t reason)
00154 {
00155 switch (reason) {
00156 case SHUTDOWN_NO_REASON: return "no reason";
00157 case SHUTDOWN_IDLE_TIMEOUT: return "idle connection";
00158 case SHUTDOWN_VERSION_MISMATCH: return "version mismatch";
00159 case SHUTDOWN_BUSY: return "node is busy";
00160 }
00161 NOTREACHED;
00162 }
00163
00167 class StreamLinkParams : public ConnectionConvergenceLayer::LinkParams {
00168 public:
00169 bool segment_ack_enabled_;
00170 bool negative_ack_enabled_;
00171 u_int keepalive_interval_;
00172 u_int segment_length_;
00173
00174 protected:
00175
00176 StreamLinkParams(bool init_defaults);
00177 };
00178
00182 u_int8_t cl_version_;
00183
00187 class Connection : public CLConnection {
00188 public:
00192 Connection(const char* classname,
00193 const char* logpath,
00194 StreamConvergenceLayer* cl,
00195 StreamLinkParams* params,
00196 bool active_connector);
00197
00199 bool send_pending_data();
00200 void handle_send_bundle(Bundle* bundle);
00201 void handle_cancel_bundle(Bundle* bundle);
00202 void handle_poll_timeout();
00203 void break_contact(ContactEvent::reason_t reason);
00205
00206 protected:
00211 virtual void send_data() = 0;
00212
00214 void initiate_contact();
00215 void process_data();
00216 void check_keepalive();
00218
00219 private:
00221 void note_data_rcvd();
00222 void note_data_sent();
00223 bool send_pending_acks();
00224 bool start_next_bundle();
00225 bool send_next_segment(InFlightBundle* inflight);
00226 bool send_data_todo(InFlightBundle* inflight);
00227 bool finish_bundle(InFlightBundle* inflight);
00228 void check_completed(InFlightBundle* inflight);
00229 void send_keepalive();
00230
00231 void handle_contact_initiation();
00232 bool handle_data_segment(u_int8_t flags);
00233 bool handle_data_todo();
00234 bool handle_ack_segment(u_int8_t flags);
00235 bool handle_refuse_bundle(u_int8_t flags);
00236 bool handle_keepalive(u_int8_t flags);
00237 bool handle_shutdown(u_int8_t flags);
00238 void check_completed(IncomingBundle* incoming);
00240
00245 StreamLinkParams* stream_lparams()
00246 {
00247 StreamLinkParams* ret = dynamic_cast<StreamLinkParams*>(params_);
00248 ASSERT(ret != NULL);
00249 return ret;
00250 }
00251
00252 protected:
00253 InFlightBundle* current_inflight_;
00254 size_t send_segment_todo_;
00255 size_t recv_segment_todo_;
00256 struct timeval data_rcvd_;
00257 struct timeval data_sent_;
00258 struct timeval keepalive_sent_;
00259 bool breaking_contact_;
00260
00261 };
00262
00264 typedef ConnectionConvergenceLayer::LinkParams LinkParams;
00265
00267 void dump_link(Link* link, oasys::StringBuffer* buf);
00269
00271 bool parse_link_params(LinkParams* params,
00272 int argc, const char** argv,
00273 const char** invalidp);
00274 bool finish_init_link(Link* link, LinkParams* params);
00276
00277 };
00278
00279 }
00280
00281 #endif