stream-transport-impl.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2022 Regents of the University of California.
4  *
5  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
6  *
7  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
8  * terms of the GNU Lesser General Public License as published by the Free Software
9  * Foundation, either version 3 of the License, or (at your option) any later version.
10  *
11  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
12  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
13  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
14  *
15  * You should have received copies of the GNU General Public License and GNU Lesser
16  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
17  * <http://www.gnu.org/licenses/>.
18  *
19  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
20  */
21 
22 #ifndef NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
23 #define NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
24 
26 
27 #include <boost/asio/steady_timer.hpp>
28 #include <boost/asio/write.hpp>
29 
30 #include <list>
31 #include <queue>
32 
33 namespace ndn {
34 namespace detail {
35 
41 template<typename BaseTransport, typename Protocol>
42 class StreamTransportImpl : public std::enable_shared_from_this<StreamTransportImpl<BaseTransport, Protocol>>
43 {
44 public:
46  using TransmissionQueue = std::queue<Block, std::list<Block>>;
47 
48  StreamTransportImpl(BaseTransport& transport, boost::asio::io_service& ioService)
49  : m_transport(transport)
50  , m_socket(ioService)
51  , m_connectTimer(ioService)
52  {
53  }
54 
55  void
56  connect(const typename Protocol::endpoint& endpoint)
57  {
58  if (m_transport.getState() == Transport::State::CONNECTING) {
59  return;
60  }
62 
63  // Wait at most 4 seconds to connect
65  m_connectTimer.expires_from_now(std::chrono::seconds(4));
66  m_connectTimer.async_wait([self = this->shared_from_this()] (const auto& error) {
67  self->connectTimeoutHandler(error);
68  });
69 
70  m_socket.open();
71  m_socket.async_connect(endpoint, [self = this->shared_from_this()] (const auto& error) {
72  self->connectHandler(error);
73  });
74  }
75 
76  void
78  {
80 
81  boost::system::error_code error; // to silently ignore all errors
82  m_connectTimer.cancel(error);
83  m_socket.cancel(error);
84  m_socket.close(error);
85 
86  TransmissionQueue{}.swap(m_transmissionQueue); // clear the queue
87  }
88 
89  void
91  {
92  if (m_transport.getState() == Transport::State::RUNNING) {
93  m_socket.cancel();
95  }
96  }
97 
98  void
100  {
101  if (m_transport.getState() == Transport::State::PAUSED) {
103  m_inputBufferSize = 0;
104  asyncReceive();
105  }
106  }
107 
108  void
109  send(const Block& block)
110  {
111  m_transmissionQueue.push(block);
112 
113  if (m_transport.getState() != Transport::State::CLOSED &&
115  m_transmissionQueue.size() == 1) {
116  asyncWrite();
117  }
118  // if not connected or there's another transmission in progress (m_transmissionQueue.size() > 1),
119  // the next write will be scheduled either in connectHandler or in asyncWriteHandler
120  }
121 
122 protected:
123  void
124  connectHandler(const boost::system::error_code& error)
125  {
126  m_connectTimer.cancel();
127 
128  if (error) {
129  m_transport.close();
130  NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
131  }
132 
134 
135  if (!m_transmissionQueue.empty()) {
136  resume();
137  asyncWrite();
138  }
139  }
140 
141  void
142  connectTimeoutHandler(const boost::system::error_code& error)
143  {
144  if (error) // e.g., cancelled timer
145  return;
146 
147  m_transport.close();
148  NDN_THROW(Transport::Error(error, "error while connecting to the forwarder"));
149  }
150 
151  void
153  {
154  BOOST_ASSERT(!m_transmissionQueue.empty());
155  boost::asio::async_write(m_socket, boost::asio::buffer(m_transmissionQueue.front()),
156  // capture a copy of the shared_ptr to "this" to prevent deallocation
157  [this, self = this->shared_from_this()] (const auto& error, size_t) {
158  if (error) {
159  if (error == boost::system::errc::operation_canceled) {
160  // async receive has been explicitly cancelled (e.g., socket close)
161  return;
162  }
163  m_transport.close();
164  NDN_THROW(Transport::Error(error, "error while writing data to socket"));
165  }
166 
167  if (m_transport.getState() == Transport::State::CLOSED) {
168  return; // queue has already been cleared
169  }
170 
171  BOOST_ASSERT(!m_transmissionQueue.empty());
172  m_transmissionQueue.pop();
173 
174  if (!m_transmissionQueue.empty()) {
175  asyncWrite();
176  }
177  });
178  }
179 
180  void
182  {
183  m_socket.async_receive(boost::asio::buffer(m_inputBuffer + m_inputBufferSize,
184  MAX_NDN_PACKET_SIZE - m_inputBufferSize), 0,
185  // capture a copy of the shared_ptr to "this" to prevent deallocation
186  [this, self = this->shared_from_this()] (const auto& error, size_t nBytesRecvd) {
187  if (error) {
188  if (error == boost::system::errc::operation_canceled) {
189  // async receive has been explicitly cancelled (e.g., socket close)
190  return;
191  }
192  m_transport.close();
193  NDN_THROW(Transport::Error(error, "error while receiving data from socket"));
194  }
195 
196  m_inputBufferSize += nBytesRecvd;
197  // do magic
198 
199  std::size_t offset = 0;
200  bool hasProcessedSome = processAllReceived(m_inputBuffer, offset, m_inputBufferSize);
201  if (!hasProcessedSome && m_inputBufferSize == MAX_NDN_PACKET_SIZE && offset == 0) {
202  m_transport.close();
203  NDN_THROW(Transport::Error("input buffer full, but a valid TLV cannot be decoded"));
204  }
205 
206  if (offset > 0) {
207  if (offset != m_inputBufferSize) {
208  std::copy(m_inputBuffer + offset, m_inputBuffer + m_inputBufferSize, m_inputBuffer);
209  m_inputBufferSize -= offset;
210  }
211  else {
212  m_inputBufferSize = 0;
213  }
214  }
215 
216  asyncReceive();
217  });
218  }
219 
220  bool
221  processAllReceived(uint8_t* buffer, size_t& offset, size_t nBytesAvailable)
222  {
223  while (offset < nBytesAvailable) {
224  bool isOk = false;
225  Block element;
226  std::tie(isOk, element) = Block::fromBuffer({buffer + offset, nBytesAvailable - offset});
227  if (!isOk)
228  return false;
229 
230  m_transport.m_receiveCallback(element);
231  offset += element.size();
232  }
233  return true;
234  }
235 
236 protected:
237  BaseTransport& m_transport;
238 
239  typename Protocol::socket m_socket;
240  uint8_t m_inputBuffer[MAX_NDN_PACKET_SIZE];
241  size_t m_inputBufferSize = 0;
243  boost::asio::steady_timer m_connectTimer;
244 };
245 
246 } // namespace detail
247 } // namespace ndn
248 
249 #endif // NDN_CXX_TRANSPORT_DETAIL_STREAM_TRANSPORT_IMPL_HPP
Represents a TLV element of the NDN packet format.
Definition: block.hpp:45
size_t size() const
Return the size of the encoded wire, i.e., of the whole TLV.
Definition: block.cpp:294
static std::tuple< bool, Block > fromBuffer(ConstBufferPtr buffer, size_t offset=0)
Try to parse Block from a wire buffer.
Definition: block.cpp:162
Implementation detail of a Boost.Asio-based stream-oriented transport.
bool processAllReceived(uint8_t *buffer, size_t &offset, size_t nBytesAvailable)
boost::asio::steady_timer m_connectTimer
std::queue< Block, std::list< Block > > TransmissionQueue
void connect(const typename Protocol::endpoint &endpoint)
StreamTransportImpl(BaseTransport &transport, boost::asio::io_service &ioService)
void connectHandler(const boost::system::error_code &error)
void connectTimeoutHandler(const boost::system::error_code &error)
#define NDN_THROW(e)
Definition: exception.hpp:61
boost::chrono::seconds seconds
Definition: time.hpp:47
Definition: data.cpp:25
const size_t MAX_NDN_PACKET_SIZE
Practical size limit of a network-layer packet.
Definition: tlv.hpp:41