generic-link-service.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2018, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "generic-link-service.hpp"
27 
28 #include <ndn-cxx/lp/tags.hpp>
29 
30 #include <cmath>
31 
32 namespace nfd {
33 namespace face {
34 
35 NFD_LOG_INIT(GenericLinkService);
36 
37 constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR = 2;
38 
40  : m_options(options)
41  , m_fragmenter(m_options.fragmenterOptions, this)
42  , m_reassembler(m_options.reassemblerOptions, this)
43  , m_reliability(m_options.reliabilityOptions, this)
44  , m_lastSeqNo(-2)
45  , m_nextMarkTime(time::steady_clock::TimePoint::max())
46  , m_lastMarkTime(time::steady_clock::TimePoint::min())
47  , m_nMarkedSinceInMarkingState(0)
48 {
49  m_reassembler.beforeTimeout.connect([this] (auto...) { ++this->nReassemblyTimeouts; });
50  m_reliability.onDroppedInterest.connect([this] (const auto& i) { this->notifyDroppedInterest(i); });
51  nReassembling.observe(&m_reassembler);
52 }
53 
54 void
56 {
57  m_options = options;
58  m_fragmenter.setOptions(m_options.fragmenterOptions);
59  m_reassembler.setOptions(m_options.reassemblerOptions);
60  m_reliability.setOptions(m_options.reliabilityOptions);
61 }
62 
63 void
64 GenericLinkService::requestIdlePacket()
65 {
66  // No need to request Acks to attach to this packet from LpReliability, as they are already
67  // attached in sendLpPacket
68  this->sendLpPacket({});
69 }
70 
71 void
72 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
73 {
74  const ssize_t mtu = this->getTransport()->getMtu();
75 
76  if (m_options.reliabilityOptions.isEnabled) {
77  m_reliability.piggyback(pkt, mtu);
78  }
79 
80  if (m_options.allowCongestionMarking) {
81  checkCongestionLevel(pkt);
82  }
83 
84  Transport::Packet tp(pkt.wireEncode());
85  if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
86  ++this->nOutOverMtu;
87  NFD_LOG_FACE_WARN("attempted to send packet over MTU limit");
88  return;
89  }
90  this->sendPacket(std::move(tp));
91 }
92 
93 void
94 GenericLinkService::doSendInterest(const Interest& interest)
95 {
96  lp::Packet lpPacket(interest.wireEncode());
97 
98  encodeLpFields(interest, lpPacket);
99 
100  this->sendNetPacket(std::move(lpPacket), true);
101 }
102 
103 void
104 GenericLinkService::doSendData(const Data& data)
105 {
106  lp::Packet lpPacket(data.wireEncode());
107 
108  encodeLpFields(data, lpPacket);
109 
110  this->sendNetPacket(std::move(lpPacket), false);
111 }
112 
113 void
114 GenericLinkService::doSendNack(const lp::Nack& nack)
115 {
116  lp::Packet lpPacket(nack.getInterest().wireEncode());
117  lpPacket.add<lp::NackField>(nack.getHeader());
118 
119  encodeLpFields(nack, lpPacket);
120 
121  this->sendNetPacket(std::move(lpPacket), false);
122 }
123 
124 void
125 GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
126 {
127  if (m_options.allowLocalFields) {
128  shared_ptr<lp::IncomingFaceIdTag> incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
129  if (incomingFaceIdTag != nullptr) {
130  lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
131  }
132  }
133 
134  shared_ptr<lp::CongestionMarkTag> congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
135  if (congestionMarkTag != nullptr) {
136  lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
137  }
138 
139  if (m_options.allowSelfLearning) {
140  shared_ptr<lp::NonDiscoveryTag> nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
141  if (nonDiscoveryTag != nullptr) {
142  lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
143  }
144 
145  shared_ptr<lp::PrefixAnnouncementTag> prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
146  if (prefixAnnouncementTag != nullptr) {
147  lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
148  }
149  }
150 }
151 
152 void
153 GenericLinkService::sendNetPacket(lp::Packet&& pkt, bool isInterest)
154 {
155  std::vector<lp::Packet> frags;
156  ssize_t mtu = this->getTransport()->getMtu();
157 
158  // Make space for feature fields in fragments
159  if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
161  }
162 
163  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
164  mtu -= CONGESTION_MARK_SIZE;
165  }
166 
167  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu > 0);
168 
169  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
170  bool isOk = false;
171  std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
172  if (!isOk) {
173  // fragmentation failed (warning is logged by LpFragmenter)
174  ++this->nFragmentationErrors;
175  return;
176  }
177  }
178  else {
179  if (m_options.reliabilityOptions.isEnabled) {
180  frags.push_back(pkt);
181  }
182  else {
183  frags.push_back(std::move(pkt));
184  }
185  }
186 
187  if (frags.size() == 1) {
188  // even if indexed fragmentation is enabled, the fragmenter should not
189  // fragment the packet if it can fit in MTU
190  BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
191  BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
192  }
193 
194  // Only assign sequences to fragments if packet contains more than 1 fragment
195  if (frags.size() > 1) {
196  // Assign sequences to all fragments
197  this->assignSequences(frags);
198  }
199 
200  if (m_options.reliabilityOptions.isEnabled && frags.front().has<lp::FragmentField>()) {
201  m_reliability.handleOutgoing(frags, std::move(pkt), isInterest);
202  }
203 
204  for (lp::Packet& frag : frags) {
205  this->sendLpPacket(std::move(frag));
206  }
207 }
208 
209 void
210 GenericLinkService::assignSequence(lp::Packet& pkt)
211 {
212  pkt.set<lp::SequenceField>(++m_lastSeqNo);
213 }
214 
215 void
216 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
217 {
218  std::for_each(pkts.begin(), pkts.end(), [this] (auto& pkt) { this->assignSequence(pkt); });
219 }
220 
221 void
222 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
223 {
224  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
225  // This operation requires that the transport supports retrieving current send queue length
226  if (sendQueueLength < 0) {
227  return;
228  }
229 
230  // To avoid overflowing the queue, set the congestion threshold to at least half of the send
231  // queue capacity.
232  size_t congestionThreshold = m_options.defaultCongestionThreshold;
233  if (getTransport()->getSendQueueCapacity() >= 0) {
234  congestionThreshold = std::min(congestionThreshold,
235  static_cast<size_t>(getTransport()->getSendQueueCapacity()) /
236  DEFAULT_CONGESTION_THRESHOLD_DIVISOR);
237  }
238 
239  if (sendQueueLength > 0) {
240  NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" << congestionThreshold <<
241  " capacity=" << getTransport()->getSendQueueCapacity());
242  }
243 
244  if (static_cast<size_t>(sendQueueLength) > congestionThreshold) { // Send queue is congested
245  const auto now = time::steady_clock::now();
246  if (now >= m_nextMarkTime || now >= m_lastMarkTime + m_options.baseCongestionMarkingInterval) {
247  // Mark at most one initial packet per baseCongestionMarkingInterval
248  if (m_nMarkedSinceInMarkingState == 0) {
249  m_nextMarkTime = now;
250  }
251 
252  // Time to mark packet
253  pkt.set<lp::CongestionMarkField>(1);
255  NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
256 
257  ++m_nMarkedSinceInMarkingState;
258  // Decrease the marking interval by the inverse of the square root of the number of packets
259  // marked in this incident of congestion
260  m_nextMarkTime += time::nanoseconds(static_cast<time::nanoseconds::rep>(
261  m_options.baseCongestionMarkingInterval.count() /
262  std::sqrt(m_nMarkedSinceInMarkingState)));
263  m_lastMarkTime = now;
264  }
265  }
266  else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
267  // Congestion incident has ended, so reset
268  NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
269  m_nextMarkTime = time::steady_clock::TimePoint::max();
270  m_nMarkedSinceInMarkingState = 0;
271  }
272 }
273 
274 void
275 GenericLinkService::doReceivePacket(Transport::Packet&& packet)
276 {
277  try {
278  lp::Packet pkt(packet.packet);
279 
280  if (m_options.reliabilityOptions.isEnabled) {
281  m_reliability.processIncomingPacket(pkt);
282  }
283 
284  if (!pkt.has<lp::FragmentField>()) {
285  NFD_LOG_FACE_TRACE("received IDLE packet: DROP");
286  return;
287  }
288 
289  if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
290  !m_options.allowReassembly) {
291  NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
292  return;
293  }
294 
295  bool isReassembled = false;
296  Block netPkt;
297  lp::Packet firstPkt;
298  std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(packet.remoteEndpoint,
299  pkt);
300  if (isReassembled) {
301  this->decodeNetPacket(netPkt, firstPkt);
302  }
303  }
304  catch (const tlv::Error& e) {
305  ++this->nInLpInvalid;
306  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
307  }
308 }
309 
310 void
311 GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt)
312 {
313  try {
314  switch (netPkt.type()) {
315  case tlv::Interest:
316  if (firstPkt.has<lp::NackField>()) {
317  this->decodeNack(netPkt, firstPkt);
318  }
319  else {
320  this->decodeInterest(netPkt, firstPkt);
321  }
322  break;
323  case tlv::Data:
324  this->decodeData(netPkt, firstPkt);
325  break;
326  default:
327  ++this->nInNetInvalid;
328  NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
329  return;
330  }
331  }
332  catch (const tlv::Error& e) {
333  ++this->nInNetInvalid;
334  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
335  }
336 }
337 
338 void
339 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt)
340 {
341  BOOST_ASSERT(netPkt.type() == tlv::Interest);
342  BOOST_ASSERT(!firstPkt.has<lp::NackField>());
343 
344  // forwarding expects Interest to be created with make_shared
345  auto interest = make_shared<Interest>(netPkt);
346 
347  if (firstPkt.has<lp::NextHopFaceIdField>()) {
348  if (m_options.allowLocalFields) {
349  interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
350  }
351  else {
352  NFD_LOG_FACE_WARN("received NextHopFaceId, but local fields disabled: DROP");
353  return;
354  }
355  }
356 
357  if (firstPkt.has<lp::CachePolicyField>()) {
358  ++this->nInNetInvalid;
359  NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
360  return;
361  }
362 
363  if (firstPkt.has<lp::IncomingFaceIdField>()) {
364  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
365  }
366 
367  if (firstPkt.has<lp::CongestionMarkField>()) {
368  interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
369  }
370 
371  if (firstPkt.has<lp::NonDiscoveryField>()) {
372  if (m_options.allowSelfLearning) {
373  interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
374  }
375  else {
376  NFD_LOG_FACE_WARN("received NonDiscovery, but self-learning disabled: IGNORE");
377  }
378  }
379 
380  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
381  ++this->nInNetInvalid;
382  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Interest: DROP");
383  return;
384  }
385 
386  this->receiveInterest(*interest);
387 }
388 
389 void
390 GenericLinkService::decodeData(const Block& netPkt, const lp::Packet& firstPkt)
391 {
392  BOOST_ASSERT(netPkt.type() == tlv::Data);
393 
394  // forwarding expects Data to be created with make_shared
395  auto data = make_shared<Data>(netPkt);
396 
397  if (firstPkt.has<lp::NackField>()) {
398  ++this->nInNetInvalid;
399  NFD_LOG_FACE_WARN("received Nack with Data: DROP");
400  return;
401  }
402 
403  if (firstPkt.has<lp::NextHopFaceIdField>()) {
404  ++this->nInNetInvalid;
405  NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
406  return;
407  }
408 
409  if (firstPkt.has<lp::CachePolicyField>()) {
410  // CachePolicy is unprivileged and does not require allowLocalFields option.
411  // In case of an invalid CachePolicyType, get<lp::CachePolicyField> will throw,
412  // so it's unnecessary to check here.
413  data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
414  }
415 
416  if (firstPkt.has<lp::IncomingFaceIdField>()) {
417  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
418  }
419 
420  if (firstPkt.has<lp::CongestionMarkField>()) {
421  data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
422  }
423 
424  if (firstPkt.has<lp::NonDiscoveryField>()) {
425  ++this->nInNetInvalid;
426  NFD_LOG_FACE_WARN("received NonDiscovery with Data: DROP");
427  return;
428  }
429 
430  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
431  if (m_options.allowSelfLearning) {
432  data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
433  }
434  else {
435  NFD_LOG_FACE_WARN("received PrefixAnnouncement, but self-learning disabled: IGNORE");
436  }
437  }
438 
439  this->receiveData(*data);
440 }
441 
442 void
443 GenericLinkService::decodeNack(const Block& netPkt, const lp::Packet& firstPkt)
444 {
445  BOOST_ASSERT(netPkt.type() == tlv::Interest);
446  BOOST_ASSERT(firstPkt.has<lp::NackField>());
447 
448  lp::Nack nack((Interest(netPkt)));
449  nack.setHeader(firstPkt.get<lp::NackField>());
450 
451  if (firstPkt.has<lp::NextHopFaceIdField>()) {
452  ++this->nInNetInvalid;
453  NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
454  return;
455  }
456 
457  if (firstPkt.has<lp::CachePolicyField>()) {
458  ++this->nInNetInvalid;
459  NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
460  return;
461  }
462 
463  if (firstPkt.has<lp::IncomingFaceIdField>()) {
464  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
465  }
466 
467  if (firstPkt.has<lp::CongestionMarkField>()) {
468  nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
469  }
470 
471  if (firstPkt.has<lp::NonDiscoveryField>()) {
472  ++this->nInNetInvalid;
473  NFD_LOG_FACE_WARN("received NonDiscovery with Nack: DROP");
474  return;
475  }
476 
477  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
478  ++this->nInNetInvalid;
479  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Nack: DROP");
480  return;
481  }
482 
483  this->receiveNack(nack);
484 }
485 
486 } // namespace face
487 } // namespace nfd
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Definition: face-log.hpp:79
virtual ssize_t getSendQueueLength()
Definition: transport.hpp:287
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:96
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
Definition: face-log.hpp:82
ssize_t getMtu() const
Definition: transport.hpp:469
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
bool isEnabled
enables link-layer reliability
void setOptions(const Options &options)
set options for fragmenter
stores a packet along with the remote endpoint
Definition: transport.hpp:122
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
Definition: face-log.hpp:88
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TxSequence TLV-LENGTH (1 octet) + sizeof(lp::Sequence) ...
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
signal::Signal< LpReassembler, Transport::EndpointId, size_t > beforeTimeout
signals before a partial packet is dropped due to timeout
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
fragments a network-layer packet into link-layer packets
constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
void setOptions(const Options &options)
set options for reliability
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
std::tuple< bool, Block, lp::Packet > receiveFragment(Transport::EndpointId remoteEndpoint, const lp::Packet &packet)
adds received fragment to buffer
void setOptions(const Options &options)
set options for reassembler