datagram-transport.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #ifndef NFD_DAEMON_FACE_DATAGRAM_TRANSPORT_HPP
27 #define NFD_DAEMON_FACE_DATAGRAM_TRANSPORT_HPP
28 
29 #include "transport.hpp"
30 #include "socket-utils.hpp"
31 #include "common/global.hpp"
32 
33 #include <array>
34 
35 namespace nfd {
36 namespace face {
37 
38 struct Unicast {};
39 struct Multicast {};
40 
45 template<class Protocol, class Addressing = Unicast>
47 {
48 public:
49  typedef Protocol protocol;
50 
55  explicit
56  DatagramTransport(typename protocol::socket&& socket);
57 
58  ssize_t
59  getSendQueueLength() override;
60 
63  void
64  receiveDatagram(const uint8_t* buffer, size_t nBytesReceived,
65  const boost::system::error_code& error);
66 
67 protected:
68  void
69  doClose() override;
70 
71  void
72  doSend(const Block& packet, const EndpointId& endpoint) override;
73 
74  void
75  handleSend(const boost::system::error_code& error, size_t nBytesSent);
76 
77  void
78  handleReceive(const boost::system::error_code& error, size_t nBytesReceived);
79 
80  void
81  processErrorCode(const boost::system::error_code& error);
82 
83  bool
84  hasRecentlyReceived() const;
85 
86  void
87  resetRecentlyReceived();
88 
89  static EndpointId
90  makeEndpointId(const typename protocol::endpoint& ep);
91 
92 protected:
93  typename protocol::socket m_socket;
94  typename protocol::endpoint m_sender;
95 
97 
98 private:
99  std::array<uint8_t, ndn::MAX_NDN_PACKET_SIZE> m_receiveBuffer;
100  bool m_hasRecentlyReceived;
101 };
102 
103 
104 template<class T, class U>
105 DatagramTransport<T, U>::DatagramTransport(typename DatagramTransport::protocol::socket&& socket)
106  : m_socket(std::move(socket))
107  , m_hasRecentlyReceived(false)
108 {
109  boost::asio::socket_base::send_buffer_size sendBufferSizeOption;
110  boost::system::error_code error;
111  m_socket.get_option(sendBufferSizeOption, error);
112  if (error) {
113  NFD_LOG_FACE_WARN("Failed to obtain send queue capacity from socket: " << error.message());
115  }
116  else {
117  this->setSendQueueCapacity(sendBufferSizeOption.value());
118  }
119 
120  m_socket.async_receive_from(boost::asio::buffer(m_receiveBuffer), m_sender,
121  [this] (auto&&... args) {
122  this->handleReceive(std::forward<decltype(args)>(args)...);
123  });
124 }
125 
126 template<class T, class U>
127 ssize_t
129 {
130  ssize_t queueLength = getTxQueueLength(m_socket.native_handle());
131  if (queueLength == QUEUE_ERROR) {
132  NFD_LOG_FACE_WARN("Failed to obtain send queue length from socket: " << std::strerror(errno));
133  }
134  return queueLength;
135 }
136 
137 template<class T, class U>
138 void
140 {
141  NFD_LOG_FACE_TRACE(__func__);
142 
143  if (m_socket.is_open()) {
144  // Cancel all outstanding operations and close the socket.
145  // Use the non-throwing variants and ignore errors, if any.
146  boost::system::error_code error;
147  m_socket.cancel(error);
148  m_socket.close(error);
149  }
150 
151  // Ensure that the Transport stays alive at least until
152  // all pending handlers are dispatched
153  getGlobalIoService().post([this] {
155  });
156 }
157 
158 template<class T, class U>
159 void
160 DatagramTransport<T, U>::doSend(const Block& packet, const EndpointId&)
161 {
162  NFD_LOG_FACE_TRACE(__func__);
163 
164  m_socket.async_send(boost::asio::buffer(packet),
165  // 'packet' is copied into the lambda to retain the underlying Buffer
166  [this, packet] (auto&&... args) {
167  this->handleSend(std::forward<decltype(args)>(args)...);
168  });
169 }
170 
171 template<class T, class U>
172 void
173 DatagramTransport<T, U>::receiveDatagram(const uint8_t* buffer, size_t nBytesReceived,
174  const boost::system::error_code& error)
175 {
176  if (error)
177  return processErrorCode(error);
178 
179  NFD_LOG_FACE_TRACE("Received: " << nBytesReceived << " bytes from " << m_sender);
180 
181  bool isOk = false;
182  Block element;
183  std::tie(isOk, element) = Block::fromBuffer(buffer, nBytesReceived);
184  if (!isOk) {
185  NFD_LOG_FACE_WARN("Failed to parse incoming packet from " << m_sender);
186  // This packet won't extend the face lifetime
187  return;
188  }
189  if (element.size() != nBytesReceived) {
190  NFD_LOG_FACE_WARN("Received datagram size and decoded element size don't match");
191  // This packet won't extend the face lifetime
192  return;
193  }
194  m_hasRecentlyReceived = true;
195 
196  this->receive(element, makeEndpointId(m_sender));
197 }
198 
199 template<class T, class U>
200 void
201 DatagramTransport<T, U>::handleReceive(const boost::system::error_code& error, size_t nBytesReceived)
202 {
203  receiveDatagram(m_receiveBuffer.data(), nBytesReceived, error);
204 
205  if (m_socket.is_open())
206  m_socket.async_receive_from(boost::asio::buffer(m_receiveBuffer), m_sender,
207  [this] (auto&&... args) {
208  this->handleReceive(std::forward<decltype(args)>(args)...);
209  });
210 }
211 
212 template<class T, class U>
213 void
214 DatagramTransport<T, U>::handleSend(const boost::system::error_code& error, size_t nBytesSent)
215 {
216  if (error)
217  return processErrorCode(error);
218 
219  NFD_LOG_FACE_TRACE("Successfully sent: " << nBytesSent << " bytes");
220 }
221 
222 template<class T, class U>
223 void
224 DatagramTransport<T, U>::processErrorCode(const boost::system::error_code& error)
225 {
226  NFD_LOG_FACE_TRACE(__func__);
227 
231  error == boost::asio::error::operation_aborted) {
232  // transport is shutting down, ignore any errors
233  return;
234  }
235 
236  if (getPersistency() == ndn::nfd::FACE_PERSISTENCY_PERMANENT) {
237  NFD_LOG_FACE_DEBUG("Permanent face ignores error: " << error.message());
238  return;
239  }
240 
241  NFD_LOG_FACE_ERROR("Send or receive operation failed: " << error.message());
243  doClose();
244 }
245 
246 template<class T, class U>
247 bool
249 {
250  return m_hasRecentlyReceived;
251 }
252 
253 template<class T, class U>
254 void
256 {
257  m_hasRecentlyReceived = false;
258 }
259 
260 template<class T, class U>
262 DatagramTransport<T, U>::makeEndpointId(const typename protocol::endpoint&)
263 {
264  return 0;
265 }
266 
267 } // namespace face
268 } // namespace nfd
269 
270 #endif // NFD_DAEMON_FACE_DATAGRAM_TRANSPORT_HPP
void setSendQueueCapacity(ssize_t sendQueueCapacity)
Definition: transport.hpp:461
void receiveDatagram(const uint8_t *buffer, size_t nBytesReceived, const boost::system::error_code &error)
Receive datagram, translate buffer into packet, deliver to parent class.
void doClose() override
performs Transport specific operations to close the transport
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
void doSend(const Block &packet, const EndpointId &endpoint) override
performs Transport specific operations to send a packet
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
STL namespace.
const ssize_t QUEUE_ERROR
indicates that the transport was unable to retrieve the queue capacity/length
Definition: transport.hpp:103
#define NFD_LOG_MEMBER_DECL()
Definition: logger.hpp:32
The lower half of a Face.
Definition: transport.hpp:108
uint64_t EndpointId
Identifies a remote endpoint on the link.
Definition: face-common.hpp:65
the transport is being closed due to a failure
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
the transport is closed, and can be safely deallocated
ssize_t getTxQueueLength(int fd)
obtain send queue length from a specified system socket
the transport is being closed gracefully, either by the peer or by a call to close() ...
static EndpointId makeEndpointId(const typename protocol::endpoint &ep)
ndn::nfd::FacePersistency getPersistency() const
Definition: transport.hpp:424
Implements Transport for datagram-based protocols.
void receive(const Block &packet, const EndpointId &endpoint=0)
Pass a received link-layer packet to the upper layer for further processing.
Definition: transport.cpp:115
TransportState getState() const
Definition: transport.hpp:467
void setState(TransportState newState)
set transport state
Definition: transport.cpp:173
void handleReceive(const boost::system::error_code &error, size_t nBytesReceived)
#define NFD_LOG_FACE_ERROR(msg)
Log a message at ERROR level.
DatagramTransport(typename protocol::socket &&socket)
Construct datagram transport.
void handleSend(const boost::system::error_code &error, size_t nBytesSent)
void processErrorCode(const boost::system::error_code &error)
boost::asio::io_service & getGlobalIoService()
Returns the global io_service instance for the calling thread.
Definition: global.cpp:36