forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtcp_server.h
More file actions
135 lines (111 loc) · 5.15 KB
/
Copy pathtcp_server.h
File metadata and controls
135 lines (111 loc) · 5.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#ifndef TCP_SERVER_H
#define TCP_SERVER_H
#include "forward.h"
#include "socket_utils.h"
#include <asio/buffer.hpp>
#include <atomic>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <vector>
using asio::ip::tcp;
using err_t = const asio::error_code &;
namespace lsl {
/// shared pointer to a socket
using tcp_socket_p = std::shared_ptr<tcp_socket>;
/// shared pointer to an acceptor socket
using tcp_acceptor_p = std::unique_ptr<tcp_acceptor>;
/// Forward declaration for synchronous write handler
class sync_write_handler;
/**
* The TCP data server.
*
* Acts as a TCP server on a free port (in the configured port range), and understands the following
* messages:
* - `LSL:streamfeed`: A request to receive streaming data on the connection. The server responds
* with the shortinfo, two samples filled with a test pattern, followed by samples until the server
* outlet goes out of existence.
* - `LSL:fullinfo`: A request for the stream_info served by this server.
* - `LSL:shortinfo`: A request for the stream_info served by this server if matching the provided
* query string. The short version of the stream_info (empty `<desc>` element) is returned.
*/
class tcp_server : public std::enable_shared_from_this<tcp_server> {
public:
/**
* Construct a new TCP server for a stream outlet.
*
* This opens a new TCP server port (in the allowed range) and, if successful, updates the
* stream_info object with the data of this connection. To have it serve connection requests,
* the member function begin_serving() must be called once. The latter should ideally not be
* done before the UDP service port has been successfully initialized, as well.
* @param info A stream_info that is shared with other server objects.
* @param io An io_context that is shared with other server objects.
* @param sendbuf A send buffer that is shared with other server objects.
* @param factory A sample factory that is shared with other server objects.
* @param protocol The protocol (IPv4 or IPv6) that shall be serviced by this server.
* @param chunk_size The preferred chunk size, in samples. If 0, the pushthrough flag determines
* the effective chunking.
* @param do_sync If true, use synchronous (blocking) socket writes for zero-copy transfer.
*/
tcp_server(stream_info_impl_p info, io_context_p io, send_buffer_p sendbuf, factory_p factory,
int chunk_size, bool allow_v4, bool allow_v6, bool do_sync = false);
/// Destructor (must be defined in .cpp due to unique_ptr to incomplete type)
~tcp_server();
/**
* Begin serving TCP connections.
*
* Should not be called before info_ has been fully initialized by all involved parties
* (tcp_server, udp_server) since no modifications to the stream_info thereafter are permitted.
*/
void begin_serving();
/**
* Initiate teardown of IO processes.
*
* The actual teardown will be performed by the IO thread that runs the operations of
* this server.
*/
void end_serving();
/**
* Write buffers to all connected sync sockets (blocking).
* Only valid when the server was created with do_sync=true.
* @param bufs Vector of const_buffers to write (gather-write).
*/
void write_all_blocking(const std::vector<asio::const_buffer> &bufs);
/// Check if this server is in sync mode
bool is_sync_mode() const { return sync_handler_ != nullptr; }
/// Check if there are any sync consumers connected (only valid if is_sync_mode()).
/// Note: disconnected consumers are detected lazily, on the next write_all_blocking that
/// fails to reach them, so this may briefly keep reporting a consumer that has gone away.
bool have_sync_consumers() const;
private:
friend class client_session;
/// Start accepting a new connection.
void accept_next_connection(tcp_acceptor_p &acceptor);
/// Register an in-flight (active) session with the server (so that we can close it when
/// a shutdown is requested externally).
void register_inflight_session(const std::shared_ptr<class client_session> &session);
void unregister_inflight_session(client_session *session);
/// Post a close of all in-flight sockets.
void close_inflight_sessions();
// data used by the transfer threads
int chunk_size_; // the chunk size to use (or 0)
// data shared with the outlet
stream_info_impl_p info_; // shared stream_info object
io_context_p io_; // shared ptr to IO service; ensures that the IO is still around by the time
// the acceptor needs to be destroyed
factory_p factory_; // reference to the sample factory (which owns the samples)
send_buffer_p send_buffer_; // the send buffer, shared with other TCP's and the outlet
// acceptor socket
tcp_acceptor_p acceptor_v4_, acceptor_v6_; // our server socket
// registry of in-flight asessions (for cancellation)
std::map<void *, std::weak_ptr<client_session>> inflight_;
std::recursive_mutex inflight_mut_; // mutex protecting the registry from concurrent access
// some cached data
std::string shortinfo_msg_; // pre-computed short-info server response
std::string fullinfo_msg_; // pre-computed full-info server response
// synchronous write handler (only set when do_sync=true)
std::unique_ptr<sync_write_handler> sync_handler_;
};
} // namespace lsl
#endif