datagram-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #ifndef NFD_DAEMON_FACE_DATAGRAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_DATAGRAM_TRANSPORT_HPP
28 
29 #include "transport.hpp"
30 #include "socket-utils.hpp"
31 #include "common/global.hpp"
32 
33 #include <array>
34 
35 namespace nfd::face {
36 
37 struct Unicast {};
38 struct Multicast {};
39 
46 template<class Protocol, class Addressing>
48 {
49 public:
50  using protocol = Protocol;
51  using addressing = Addressing;
52 
57  explicit
58  DatagramTransport(typename protocol::socket&& socket);
59 
60  ssize_t
61  getSendQueueLength() override;
62 
66  void
67  receiveDatagram(span<const uint8_t> buffer, const boost::system::error_code& error);
68 
69 protected:
70  void
71  doClose() override;
72 
73  void
74  doSend(const Block& packet) override;
75 
76  void
77  handleSend(const boost::system::error_code& error, size_t nBytesSent);
78 
79  void
80  handleReceive(const boost::system::error_code& error, size_t nBytesReceived);
81 
82  void
83  processErrorCode(const boost::system::error_code& error);
84 
85  bool
87 
88  void
90 
91 protected:
92  typename protocol::socket m_socket;
93  typename protocol::endpoint m_sender;
94 
96 
97 private:
98  std::array<uint8_t, ndn::MAX_NDN_PACKET_SIZE> m_receiveBuffer;
99  bool m_hasRecentlyReceived;
100 };
101 
102 
103 template<class T, class U>
104 DatagramTransport<T, U>::DatagramTransport(typename DatagramTransport::protocol::socket&& socket)
105  : m_socket(std::move(socket))
106  , m_hasRecentlyReceived(false)
107 {
108  boost::asio::socket_base::send_buffer_size sendBufferSizeOption;
109  boost::system::error_code error;
110  m_socket.get_option(sendBufferSizeOption, error);
111  if (error) {
112  NFD_LOG_FACE_WARN("Failed to obtain send queue capacity from socket: " << error.message());
114  }
115  else {
116  this->setSendQueueCapacity(sendBufferSizeOption.value());
117  }
118 
119  m_socket.async_receive_from(boost::asio::buffer(m_receiveBuffer), m_sender,
120  [this] (auto&&... args) {
121  this->handleReceive(std::forward<decltype(args)>(args)...);
122  });
123 }
124 
125 template<class T, class U>
126 ssize_t
128 {
129  ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
130  if (queueLength == QUEUE_ERROR) {
131  NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
132  }
133  return queueLength;
134 }
135 
136 template<class T, class U>
137 void
139 {
140  NFD_LOG_FACE_TRACE(__func__);
141 
142  if (m_socket.is_open()) {
143  // Cancel all outstanding operations and close the socket.
144  // Use the non-throwing variants and ignore errors, if any.
145  boost::system::error_code error;
146  m_socket.cancel(error);
147  m_socket.close(error);
148  }
149 
150  // Ensure that the Transport stays alive at least until
151  // all pending handlers are dispatched
152  getGlobalIoService().post([this] {
153  this->setState(TransportState::CLOSED);
154  });
155 }
156 
157 template<class T, class U>
158 void
160 {
161  NFD_LOG_FACE_TRACE(__func__);
162 
163  m_socket.async_send(boost::asio::buffer(packet),
164  // 'packet' is copied into the lambda to retain the underlying Buffer
165  [this, packet] (auto&&... args) {
166  this->handleSend(std::forward<decltype(args)>(args)...);
167  });
168 }
169 
170 template<class T, class U>
171 void
172 DatagramTransport<T, U>::receiveDatagram(span<const uint8_t> buffer,
173  const boost::system::error_code& error)
174 {
175  if (error)
176  return processErrorCode(error);
177 
178  NFD_LOG_FACE_TRACE("Received: " << buffer.size() << " bytes from " << m_sender);
179 
180  auto [isOk, element] = Block::fromBuffer(buffer);
181  if (!isOk) {
182  NFD_LOG_FACE_WARN("Failed to parse incoming packet from " << m_sender);
183  // This packet won't extend the face lifetime
184  return;
185  }
186  if (element.size() != buffer.size()) {
187  NFD_LOG_FACE_WARN("Received datagram size and decoded element size don't match");
188  // This packet won't extend the face lifetime
189  return;
190  }
191  m_hasRecentlyReceived = true;
192 
193  if constexpr (std::is_same_v<addressing, Multicast>)
194  this->receive(element, m_sender);
195  else
196  this->receive(element);
197 }
198 
199 template<class T, class U>
200 void
201 DatagramTransport<T, U>::handleReceive(const boost::system::error_code& error, size_t nBytesReceived)
202 {
203  receiveDatagram(ndn::make_span(m_receiveBuffer).first(nBytesReceived), error);
204 
205  if (m_socket.is_open())
206  m_socket.async_receive_from(boost::asio::buffer(m_receiveBuffer), m_sender,
207  [this] (auto&&... args) {
208  this->handleReceive(std::forward<decltype(args)>(args)...);
209  });
210 }
211 
212 template<class T, class U>
213 void
214 DatagramTransport<T, U>::handleSend(const boost::system::error_code& error, size_t nBytesSent)
215 {
216  if (error)
217  return processErrorCode(error);
218 
219  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
220 }
221 
222 template<class T, class U>
223 void
224 DatagramTransport<T, U>::processErrorCode(const boost::system::error_code& error)
225 {
226  NFD_LOG_FACE_TRACE(__func__);
227 
228  if (getState() == TransportState::CLOSING ||
229  getState() == TransportState::FAILED ||
230  getState() == TransportState::CLOSED ||
231  error == boost::asio::error::operation_aborted) {
232  // transport is shutting down, ignore any errors
233  return;
234  }
235 
236  if (getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
237  NFD_LOG_FACE_DEBUG("Permanent face ignores error: " << error.message());
238  return;
239  }
240 
241  NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
242  this->setState(TransportState::FAILED);
243  doClose();
244 }
245 
246 template<class T, class U>
247 bool
249 {
250  return m_hasRecentlyReceived;
251 }
252 
253 template<class T, class U>
254 void
256 {
257  m_hasRecentlyReceived = false;
258 }
259 
260 } // namespace nfd::face
261 
262 #endif // NFD_DAEMON_FACE_DATAGRAM_TRANSPORT_HPP
Implements Transport for datagram-based protocols.
void processErrorCode(const boost::system::error_code &error)
ssize_t getSendQueueLength() override
Returns the current send queue length of the transport (in octets).
DatagramTransport(typename protocol::socket &&socket)
Construct datagram transport.
void doSend(const Block &packet) override
Performs Transport specific operations to send a packet.
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
void receiveDatagram(span< const uint8_t > buffer, const boost::system::error_code &error)
Receive datagram, translate buffer into packet, deliver to parent class.
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
void doClose() override
Performs Transport specific operations to close the transport.
The lower half of a Face.
Definition: transport.hpp:114
void setSendQueueCapacity(ssize_t sendQueueCapacity) noexcept
Definition: transport.hpp:361
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_MEMBER_DECL()
Definition: logger.hpp:32
@ CLOSED
the transport is closed, and can be safely deallocated
@ CLOSING
the transport is being closed gracefully, either by the peer or by a call to close()
@ FAILED
the transport is being closed due to a failure
constexpr ssize_t QUEUE_ERROR
Indicates that the transport was unable to retrieve the queue capacity/length.
Definition: transport.hpp:107
ssize_t getTxQueueLength(int fd)
Obtain send queue length from a specified system socket.
boost::asio::io_service & getGlobalIoService()
Returns the global io_service instance for the calling thread.
Definition: global.cpp:36