stream-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #ifndef NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
28 
29 #include "transport.hpp"
30 #include "socket-utils.hpp"
31 #include "common/global.hpp"
32 
33 #include <queue>
34 
35 #include <boost/asio/write.hpp>
36 
37 namespace nfd::face {
38 
43 template<class Protocol>
44 class StreamTransport : public Transport
45 {
46 public:
47  using protocol = Protocol;
48 
53  explicit
54  StreamTransport(typename protocol::socket&& socket);
55 
56  ssize_t
57  getSendQueueLength() override;
58 
59 protected:
60  void
61  doClose() override;
62 
63  void
65 
66  void
67  doSend(const Block& packet) override;
68 
69  void
71 
72  void
73  handleSend(const boost::system::error_code& error,
74  size_t nBytesSent);
75 
76  void
78 
79  void
80  handleReceive(const boost::system::error_code& error,
81  size_t nBytesReceived);
82 
83  void
84  processErrorCode(const boost::system::error_code& error);
85 
86  virtual void
87  handleError(const boost::system::error_code& error);
88 
89  void
91 
92  void
94 
95  size_t
97 
98 protected:
99  typename protocol::socket m_socket;
100 
102 
103 private:
104  uint8_t m_receiveBuffer[ndn::MAX_NDN_PACKET_SIZE];
105  size_t m_receiveBufferSize;
106  std::queue<Block> m_sendQueue;
107  size_t m_sendQueueBytes;
108 };
109 
110 
111 template<class T>
112 StreamTransport<T>::StreamTransport(typename StreamTransport::protocol::socket&& socket)
113  : m_socket(std::move(socket))
114  , m_receiveBufferSize(0)
115  , m_sendQueueBytes(0)
116 {
117  // No queue capacity is set because there is no theoretical limit to the size of m_sendQueue.
118  // Therefore, protecting against send queue overflows is less critical than in other transport
119  // types. Instead, we use the default threshold specified in the GenericLinkService options.
120 
121  startReceive();
122 }
123 
124 template<class T>
125 ssize_t
127 {
128  ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
129  if (queueLength == QUEUE_ERROR) {
130  NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
131  }
132  return getSendQueueBytes() + std::max<ssize_t>(0, queueLength);
133 }
134 
135 template<class T>
136 void
138 {
139  NFD_LOG_FACE_TRACE(__func__);
140 
141  if (m_socket.is_open()) {
142  // Cancel all outstanding operations and shutdown the socket
143  // so that no further sends or receives are possible.
144  // Use the non-throwing variants and ignore errors, if any.
145  boost::system::error_code error;
146  m_socket.cancel(error);
147  m_socket.shutdown(protocol::socket::shutdown_both, error);
148  }
149 
150  // Ensure that the Transport stays alive at least until
151  // all pending handlers are dispatched
152  getGlobalIoService().post([this] { deferredClose(); });
153 
154  // Some bug or feature of Boost.Asio (see https://redmine.named-data.net/issues/1856):
155  //
156  // When doClose is called from a socket event handler (e.g., from handleReceive),
157  // m_socket.shutdown() does not trigger the cancellation of the handleSend callback.
158  // Instead, handleSend is invoked as nothing bad happened.
159  //
160  // In order to prevent the assertion in handleSend from failing, we clear the queue
161  // and close the socket in deferredClose, i.e., after all callbacks scheduled up to
162  // this point have been executed. If more send operations are scheduled after this
163  // point, they will fail because the socket has been shutdown, and their callbacks
164  // will be invoked with error code == asio::error::shut_down.
165 }
166 
167 template<class T>
168 void
170 {
171  NFD_LOG_FACE_TRACE(__func__);
172 
173  resetSendQueue();
174 
175  // use the non-throwing variant and ignore errors, if any
176  boost::system::error_code error;
177  m_socket.close(error);
178 
179  this->setState(TransportState::CLOSED);
180 }
181 
182 template<class T>
183 void
184 StreamTransport<T>::doSend(const Block& packet)
185 {
186  NFD_LOG_FACE_TRACE(__func__);
187 
188  if (getState() != TransportState::UP)
189  return;
190 
191  bool wasQueueEmpty = m_sendQueue.empty();
192  m_sendQueue.push(packet);
193  m_sendQueueBytes += packet.size();
194 
195  if (wasQueueEmpty)
196  sendFromQueue();
197 }
198 
199 template<class T>
200 void
202 {
203  boost::asio::async_write(m_socket, boost::asio::buffer(m_sendQueue.front()),
204  [this] (auto&&... args) { this->handleSend(std::forward<decltype(args)>(args)...); });
205 }
206 
207 template<class T>
208 void
209 StreamTransport<T>::handleSend(const boost::system::error_code& error,
210  size_t nBytesSent)
211 {
212  if (error)
213  return processErrorCode(error);
214 
215  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
216 
217  BOOST_ASSERT(!m_sendQueue.empty());
218  BOOST_ASSERT(m_sendQueue.front().size() == nBytesSent);
219  m_sendQueueBytes -= nBytesSent;
220  m_sendQueue.pop();
221 
222  if (!m_sendQueue.empty())
223  sendFromQueue();
224 }
225 
226 template<class T>
227 void
229 {
230  BOOST_ASSERT(getState() == TransportState::UP);
231 
232  m_socket.async_receive(boost::asio::buffer(m_receiveBuffer + m_receiveBufferSize,
233  ndn::MAX_NDN_PACKET_SIZE - m_receiveBufferSize),
234  [this] (auto&&... args) { this->handleReceive(std::forward<decltype(args)>(args)...); });
235 }
236 
237 template<class T>
238 void
239 StreamTransport<T>::handleReceive(const boost::system::error_code& error,
240  size_t nBytesReceived)
241 {
242  if (error)
243  return processErrorCode(error);
244 
245  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes");
246 
247  m_receiveBufferSize += nBytesReceived;
248  auto bufferView = ndn::make_span(m_receiveBuffer, m_receiveBufferSize);
249  size_t offset = 0;
250  bool isOk = true;
251  while (offset < bufferView.size()) {
252  Block element;
253  std::tie(isOk, element) = Block::fromBuffer(bufferView.subspan(offset));
254  if (!isOk)
255  break;
256 
257  offset += element.size();
258  BOOST_ASSERT(offset <= bufferView.size());
259 
260  this->receive(element);
261  }
262 
263  if (!isOk && m_receiveBufferSize == ndn::MAX_NDN_PACKET_SIZE && offset == 0) {
264  NFD_LOG_FACE_ERROR("Failed to parse incoming packet or packet too large to process");
265  this->setState(TransportState::FAILED);
266  doClose();
267  return;
268  }
269 
270  if (offset > 0) {
271  if (offset != m_receiveBufferSize) {
272  std::copy(m_receiveBuffer + offset, m_receiveBuffer + m_receiveBufferSize, m_receiveBuffer);
273  m_receiveBufferSize -= offset;
274  }
275  else {
276  m_receiveBufferSize = 0;
277  }
278  }
279 
280  startReceive();
281 }
282 
283 template<class T>
284 void
285 StreamTransport<T>::processErrorCode(const boost::system::error_code& error)
286 {
287  NFD_LOG_FACE_TRACE(__func__);
288 
289  if (getState() == TransportState::CLOSING ||
290  getState() == TransportState::FAILED ||
291  getState() == TransportState::CLOSED ||
292  error == boost::asio::error::operation_aborted || // when cancel() is called
293  error == boost::asio::error::shut_down) // after shutdown() is called
294  // transport is shutting down, ignore any errors
295  return;
296 
297  handleError(error);
298 }
299 
300 template<class T>
301 void
302 StreamTransport<T>::handleError(const boost::system::error_code& error)
303 {
304  if (error == boost::asio::error::eof) {
305  this->setState(TransportState::CLOSING);
306  }
307  else {
308  NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
309  this->setState(TransportState::FAILED);
310  }
311  doClose();
312 }
313 
314 template<class T>
315 void
317 {
318  m_receiveBufferSize = 0;
319 }
320 
321 template<class T>
322 void
324 {
325  std::queue<Block> emptyQueue;
326  std::swap(emptyQueue, m_sendQueue);
327  m_sendQueueBytes = 0;
328 }
329 
330 template<class T>
331 size_t
333 {
334  return m_sendQueueBytes;
335 }
336 
337 } // namespace nfd::face
338 
339 #endif // NFD_DAEMON_FACE_STREAM_TRANSPORT_HPP
Implements Transport for stream-based protocols.
virtual void handleError(const boost::system::error_code &error)
void doClose() override
Performs Transport specific operations to close the transport.
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
ssize_t getSendQueueLength() override
Returns the current send queue length of the transport (in octets).
void doSend(const Block &packet) override
Performs Transport specific operations to send a packet.
void processErrorCode(const boost::system::error_code &error)
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
StreamTransport(typename protocol::socket &&socket)
Construct stream transport.
The lower half of a Face.
Definition: transport.hpp:114
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_MEMBER_DECL()
Definition: logger.hpp:32
@ CLOSED
the transport is closed, and can be safely deallocated
@ CLOSING
the transport is being closed gracefully, either by the peer or by a call to close()
@ FAILED
the transport is being closed due to a failure
@ UP
the transport is up and can transmit packets
constexpr ssize_t QUEUE_ERROR
Indicates that the transport was unable to retrieve the queue capacity/length.
Definition: transport.hpp:107
ssize_t getTxQueueLength(int fd)
Obtain send queue length from a specified system socket.
boost::asio::io_service & getGlobalIoService()
Returns the global io_service instance for the calling thread.
Definition: global.cpp:36