Roc Toolkit internal modules
Roc Toolkit: real-time audio streaming
Loading...
Searching...
No Matches
tcp_connection_port.h
Go to the documentation of this file.
1/*
2 * Copyright (c) 2019 Roc Streaming authors
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public
5 * License, v. 2.0. If a copy of the MPL was not distributed with this
6 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
7 */
8
9//! @file roc_netio/target_libuv/roc_netio/tcp_connection_port.h
10//! @brief TCP connection.
11
12#ifndef ROC_NETIO_TCP_CONNECTION_PORT_H_
13#define ROC_NETIO_TCP_CONNECTION_PORT_H_
14
15#include <uv.h>
16
18#include "roc_core/atomic.h"
19#include "roc_core/mutex.h"
21#include "roc_core/seqlock.h"
24#include "roc_netio/iconn.h"
28
29namespace roc {
30namespace netio {
31
32//! TCP connection parameters.
34 //! Socket options.
36};
37
38//! TCP connection parameters.
40 //! Local peer address to which we're bound.
42
43 //! Remote peer address to which we're connected.
45};
46
47//! TCP connection type.
49 //! Local peer is client, remote peer is server.
51
52 //! Local peer is server, remote peer is client.
54};
55
56//! TCP connection port.
57//!
58//! Public interfaces
59//! -----------------
60//!
61//! There are two important interfaces related to TCP connection:
62//! - IConn
63//! - IConnHandler
64//!
65//! IConn is implemented by TcpConnectionPort. The interface allows to retrieve
66//! connection parameters and perform non-blocking I/O.
67//!
68//! IConnHandler is implemented by users of netio module. This interface is notified
69//! about connection state changes (e.g. connection is established) and availability
70//! of I/O (e.g. connection becomes readable).
71//!
72//! Thread access
73//! -------------
74//!
75//! Methods that are not part of IConn interface are called from within other netio
76//! classes, e.g. TcpServerPort, on the network loop thread.
77//!
78//! Methods from the IConn interface are called by users of netio module from any
79//! thread. They are thread-safe and lock-free.
80//!
81//! Connection type and lifecycle
82//! -----------------------------
83//!
84//! Connection can be client-side (connect call) or server-side (accept call).
85//!
86//! Client-side connection is created using AddTcpClientPort task of the network
87//! loop, and is closed using RemovePort task. Before removing the port, the user
88//! must call async_terminate() and wait until termination is completed.
89//!
90//! Server-side connection is created by TcpServerPort when it receives a new
91//! incoming connection. To remove it, the user should call async_terminate().
92//! When termination is completed, TcpServerPort automatically closes and
93//! destroys connection.
94//!
95//! Connection workflow
96//! -------------------
97//!
98//! The following rules must be followed:
99//!
100//! - if you called open(), even if it failed, you're responsible for calling
101//! async_close() and waiting for its completion before destroying connection
102//! - after calling open(), you should call either accept() or connect() before
103//! using connection
104//! - if you called connect() or accept(), even if it failed, you're responsible
105//! for calling async_terminate() and waiting for its completion before calling
106//! async_close()
107//! - after connection is established and before it's terminated you can
108//! perform I/O
109//! - even if connection can't be established, async_terminate() still should be
110//! called before closing and destryoing connection
111//!
112//! Connection FSM
113//! --------------
114//!
115//! TcpConnectionPort maintains an FSM and sees each operation or event handler as a
116//! transition between states. Each operation is allowed only in certain states and
117//! will panic when not used properly.
118//!
119//! State switch mostly happens on the network thread, however some limited set of
120//! transitions is allowed from other threads. For this reason, state switching is
121//! done using atomic operations.
122class TcpConnectionPort : public BasicPort, public IConn {
123public:
124 //! Initialize.
126 uv_loop_t& loop,
128
129 //! Destroy.
131
132 //! Open TCP connection.
133 //! @remarks
134 //! Should be called from network loop thread.
135 virtual bool open();
136
137 //! Asynchronously close TCP connection.
138 //! @remarks
139 //! Should be called from network loop thread.
140 virtual AsyncOperationStatus async_close(ICloseHandler& handler, void* handler_arg);
141
142 //! Establish conection by accepting it from listening socket.
143 //! @remarks
144 //! Should be called from network loop thread.
145 bool accept(const TcpConnectionConfig& config,
146 const address::SocketAddr& server_address,
147 SocketHandle server_socket);
148
149 //! Establish connection to remote peer (asynchronously).
150 //! @remarks
151 //! Should be called from network loop thread.
152 bool connect(const TcpClientConfig& config);
153
154 //! Set termination handler and start using it.
155 //! @remarks
156 //! Should be called from network loop thread.
157 void attach_terminate_handler(ITerminateHandler& handler, void* handler_arg);
158
159 //! Set connection handler and start reporting events to it.
160 //! @remarks
161 //! Should be called from network loop thread.
163
164 //! Return address of the local peer.
165 //! @remarks
166 //! Can be called from any thread.
167 virtual const address::SocketAddr& local_address() const;
168
169 //! Return address of the remote peer.
170 //! @remarks
171 //! Can be called from any thread.
172 virtual const address::SocketAddr& remote_address() const;
173
174 //! Return true if there was a failure.
175 //! @remarks
176 //! Can be called from any thread.
177 virtual bool is_failed() const;
178
179 //! Return true if the connection is writable.
180 //! @remarks
181 //! Can be called from any thread.
182 virtual bool is_writable() const;
183
184 //! Return true if the connection is readable.
185 //! @remarks
186 //! Can be called from any thread.
187 virtual bool is_readable() const;
188
189 //! Write @p buf of size @p len to the connection.
190 //! @remarks
191 //! Can be called from any thread.
192 virtual ssize_t try_write(const void* buf, size_t len);
193
194 //! Read @p len bytes from the the connection to @p buf.
195 //! @remarks
196 //! Can be called from any thread.
197 virtual ssize_t try_read(void* buf, size_t len);
198
199 //! Initiate asynchronous graceful shutdown.
200 //! @remarks
201 //! Can be called from any thread.
203
204protected:
205 //! Format descriptor.
207
208private:
209 // State of the connection FSM.
210 enum ConnectionState {
211 // not opened or already closed
212 State_Closed,
213
214 // open() is in progress
215 State_Opening,
216
217 // opened, waiting for connect() or accept()
218 State_Opened,
219
220 // accept() or connect() is in progress
221 State_Connecting,
222
223 // asynchronous connection failed, need terminate and close
224 State_Refused,
225
226 // asynchronous connection succeeded, do I/O and then terminate and close
227 State_Established,
228
229 // failure during I/O, need terminate and close
230 State_Broken,
231
232 // async_terminate() was called, asynchronous termination is in progress
233 State_Terminating,
234
235 // asynchronous termination completed, ready for closing
236 State_Terminated,
237
238 // async_close() was called, asynchronous close is in progress
239 State_Closing
240 };
241
242 // Reading or writing status of the socket.
243 enum IoStatus {
244 // socket is not ready for I/O
245 Io_NotAvailable,
246
247 // socket is ready for reading or writing
248 Io_Available,
249
250 // read or write operation is in progress
251 Io_InProgress
252 };
253
254 // I/O statistics.
255 struct IoStats {
256 // number of IConnHandler events
257 core::Seqlock<uint64_t> rd_events;
258 core::Seqlock<uint64_t> wr_events;
259
260 // number of try_read() and try_write() calls
261 uint64_t rd_calls;
262 uint64_t wr_calls;
263
264 // how much times IOErr_WouldBlock was returned
265 uint64_t rd_wouldblock;
266 uint64_t wr_wouldblock;
267
268 // number of bytes transferred
269 uint64_t rd_bytes;
270 uint64_t wr_bytes;
271
272 IoStats()
273 : rd_events(0)
274 , wr_events(0)
275 , rd_calls(0)
276 , wr_calls(0)
277 , rd_wouldblock(0)
278 , wr_wouldblock(0)
279 , rd_bytes(0)
280 , wr_bytes(0) {
281 }
282 };
283
284 static const char* conn_state_to_str_(ConnectionState);
285
286 static void poll_cb_(uv_poll_t* handle, int status, int events);
287 static void start_terminate_cb_(uv_async_t* handle);
288 static void finish_terminate_cb_(uv_handle_t* handle);
289 static void close_cb_(uv_handle_t* handle);
290
291 bool start_polling_();
292 AsyncOperationStatus async_stop_polling_(uv_close_cb completion_cb);
293
294 void disconnect_socket_();
295
296 AsyncOperationStatus async_close_();
297
298 void set_and_report_writable_();
299 void set_and_report_readable_();
300
301 ConnectionState get_state_() const;
302 void switch_and_report_state_(ConnectionState new_state);
303 bool maybe_switch_state_(ConnectionState expected_state,
304 ConnectionState desired_state);
305 void report_state_(ConnectionState state);
306
307 void set_conn_handler_(IConnHandler& handler);
308 void unset_conn_handler_();
309
310 void check_usable_(ConnectionState conn_state) const;
311 void check_usable_for_io_(ConnectionState conn_state) const;
312
313 void report_io_stats_();
314
315 uv_loop_t& loop_;
316
317 uv_poll_t poll_handle_;
318 bool poll_handle_initialized_;
319 bool poll_handle_started_;
320
321 uv_async_t terminate_sem_;
322 bool terminate_sem_initialized_;
323
324 IConnHandler* conn_handler_;
325
326 ITerminateHandler* terminate_handler_;
327 void* terminate_handler_arg_;
328
329 ICloseHandler* close_handler_;
330 void* close_handler_arg_;
331
332 TcpConnectionType type_;
333
334 address::SocketAddr local_address_;
335 address::SocketAddr remote_address_;
336
337 SocketHandle socket_;
338
339 core::Atomic<int32_t> conn_state_;
340
341 core::Atomic<int32_t> conn_was_established_;
342 core::Atomic<int32_t> conn_was_failed_;
343
344 core::Atomic<int32_t> writable_status_;
345 core::Atomic<int32_t> readable_status_;
346
347 bool got_stream_end_;
348
349 core::Mutex io_mutex_;
350
351 IoStats io_stats_;
352 core::RateLimiter report_limiter_;
353};
354
355} // namespace netio
356} // namespace roc
357
358#endif // ROC_NETIO_TCP_CONNECTION_PORT_H_
Atomic.
Base class for ports.
Memory allocator interface.
Definition iallocator.h:23
IAllocator & allocator() const
Get allocator.
Base class for ports.
Definition basic_port.h:40
Close handler interface.
Connection event handler interface.
Connection interface.
Definition iconn.h:30
Termination handler interface.
void attach_connection_handler(IConnHandler &handler)
Set connection handler and start reporting events to it.
virtual const address::SocketAddr & local_address() const
Return address of the local peer.
virtual ssize_t try_read(void *buf, size_t len)
Read len bytes from the the connection to buf.
bool connect(const TcpClientConfig &config)
Establish connection to remote peer (asynchronously).
virtual bool is_failed() const
Return true if there was a failure.
virtual AsyncOperationStatus async_close(ICloseHandler &handler, void *handler_arg)
Asynchronously close TCP connection.
TcpConnectionPort(TcpConnectionType type, uv_loop_t &loop, core::IAllocator &allocator)
Initialize.
virtual bool open()
Open TCP connection.
virtual bool is_readable() const
Return true if the connection is readable.
bool accept(const TcpConnectionConfig &config, const address::SocketAddr &server_address, SocketHandle server_socket)
Establish conection by accepting it from listening socket.
virtual ~TcpConnectionPort()
Destroy.
virtual void async_terminate(TerminationMode mode)
Initiate asynchronous graceful shutdown.
virtual const address::SocketAddr & remote_address() const
Return address of the remote peer.
virtual void format_descriptor(core::StringBuilder &b)
Format descriptor.
virtual bool is_writable() const
Return true if the connection is writable.
void attach_terminate_handler(ITerminateHandler &handler, void *handler_arg)
Set termination handler and start using it.
virtual ssize_t try_write(const void *buf, size_t len)
Write buf of size len to the connection.
Close handler interface.
Connection interface.
Connection event handler interface.
Termination handler interface.
Mutex.
int SocketHandle
Platform-specific socket handle.
Definition socket_ops.h:30
AsyncOperationStatus
Asynchronous operation status.
TcpConnectionType
TCP connection type.
@ TcpConn_Client
Local peer is client, remote peer is server.
@ TcpConn_Server
Local peer is server, remote peer is client.
TerminationMode
Connection termination mode.
Root namespace.
Rate limiter.
Seqlock.
Socket address.
Socket operations.
TCP connection parameters.
address::SocketAddr local_address
Local peer address to which we're bound.
address::SocketAddr remote_address
Remote peer address to which we're connected.
TCP connection parameters.
SocketOptions socket_options
Socket options.