forked from mutability/dump978
-
Notifications
You must be signed in to change notification settings - Fork 17
/
socket_output.h
109 lines (79 loc) · 3.9 KB
/
socket_output.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// -*- c++ -*-
// Copyright (c) 2019, FlightAware LLC.
// All rights reserved.
// Licensed under the 2-clause BSD license; see the LICENSE file
#ifndef SOCKET_OUTPUT_H
#define SOCKET_OUTPUT_H
#include <memory>
#include <sstream>
#include <boost/asio/io_service.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/strand.hpp>
#include "message_dispatch.h"
#include "uat_message.h"
namespace flightaware::uat {
class SocketOutput : public std::enable_shared_from_this<SocketOutput> {
public:
typedef std::shared_ptr<SocketOutput> Pointer;
virtual void Start();
void Write(SharedMessageVector messages);
virtual void Close();
void SetCloseNotifier(std::function<void()> notifier) { close_notifier_ = notifier; }
bool IsOpen() const { return socket_.is_open(); }
protected:
SocketOutput(boost::asio::io_service &service_, boost::asio::ip::tcp::socket &&socket_);
std::ostringstream &Buf() { return outbuf_; }
virtual void InternalWrite(SharedMessageVector messages) = 0;
private:
void HandleError(const boost::system::error_code &ec);
void Flush();
void ReadAndDiscard();
boost::asio::io_service &service_;
boost::asio::io_service::strand strand_;
boost::asio::ip::tcp::socket socket_;
boost::asio::ip::tcp::endpoint peer_;
std::ostringstream outbuf_;
bool flush_pending_;
std::function<void()> close_notifier_;
};
class RawOutput : public SocketOutput {
public:
// factory method, this class must always be constructed via make_shared
static Pointer Create(boost::asio::io_service &service, boost::asio::ip::tcp::socket &&socket, SharedMessageVector header) { return Pointer(new RawOutput(service, std::move(socket), header)); }
void Start() override;
protected:
void InternalWrite(SharedMessageVector messages) override;
private:
RawOutput(boost::asio::io_service &service_, boost::asio::ip::tcp::socket &&socket_, SharedMessageVector header) : SocketOutput(service_, std::move(socket_)) { header_ = header; }
SharedMessageVector header_;
};
class JsonOutput : public SocketOutput {
public:
// factory method, this class must always be constructed via make_shared
static Pointer Create(boost::asio::io_service &service, boost::asio::ip::tcp::socket &&socket) { return Pointer(new JsonOutput(service, std::move(socket))); }
protected:
void InternalWrite(SharedMessageVector messages) override;
private:
JsonOutput(boost::asio::io_service &service_, boost::asio::ip::tcp::socket &&socket_) : SocketOutput(service_, std::move(socket_)) {}
};
class SocketListener : public std::enable_shared_from_this<SocketListener> {
public:
typedef std::shared_ptr<SocketListener> Pointer;
typedef std::function<SocketOutput::Pointer(boost::asio::io_service &, boost::asio::ip::tcp::socket &&)> ConnectionFactory;
// factory method, this class must always be constructed via make_shared
static Pointer Create(boost::asio::io_service &service, const boost::asio::ip::tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory) { return Pointer(new SocketListener(service, endpoint, dispatch, factory)); }
void Start();
void Close();
private:
SocketListener(boost::asio::io_service &service, const boost::asio::ip::tcp::endpoint &endpoint, MessageDispatch &dispatch, ConnectionFactory factory);
void Accept();
boost::asio::io_service &service_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::asio::ip::tcp::endpoint endpoint_;
boost::asio::ip::tcp::socket socket_;
boost::asio::ip::tcp::endpoint peer_;
MessageDispatch &dispatch_;
ConnectionFactory factory_;
};
}; // namespace flightaware::uat
#endif