generic-link-service.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2018, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "generic-link-service.hpp"
27 
28 #include <ndn-cxx/lp/tags.hpp>
29 
30 #include <cmath>
31 
32 namespace nfd {
33 namespace face {
34 
35 NFD_LOG_INIT("GenericLinkService");
36 
37 constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR = 2;
38 
40  : allowLocalFields(false)
41  , allowFragmentation(false)
42  , allowReassembly(false)
43  , allowCongestionMarking(false)
44  , baseCongestionMarkingInterval(time::milliseconds(100)) // Interval from RFC 8289 (CoDel)
45  , defaultCongestionThreshold(65536) // This default value works well for a queue capacity of 200KiB
46  , allowSelfLearning(false)
47 {
48 }
49 
51  : m_options(options)
52  , m_fragmenter(m_options.fragmenterOptions, this)
53  , m_reassembler(m_options.reassemblerOptions, this)
54  , m_reliability(m_options.reliabilityOptions, this)
55  , m_lastSeqNo(-2)
56  , m_nextMarkTime(time::steady_clock::TimePoint::max())
57  , m_lastMarkTime(time::steady_clock::TimePoint::min())
58  , m_nMarkedSinceInMarkingState(0)
59 {
60  m_reassembler.beforeTimeout.connect(bind([this] { ++this->nReassemblyTimeouts; }));
61  m_reliability.onDroppedInterest.connect([this] (const Interest& i) { this->notifyDroppedInterest(i); });
62  nReassembling.observe(&m_reassembler);
63 }
64 
65 void
67 {
68  m_options = options;
69  m_fragmenter.setOptions(m_options.fragmenterOptions);
70  m_reassembler.setOptions(m_options.reassemblerOptions);
71  m_reliability.setOptions(m_options.reliabilityOptions);
72 }
73 
74 void
75 GenericLinkService::requestIdlePacket()
76 {
77  // No need to request Acks to attach to this packet from LpReliability, as they are already
78  // attached in sendLpPacket
79  this->sendLpPacket({});
80 }
81 
82 void
83 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
84 {
85  const ssize_t mtu = this->getTransport()->getMtu();
86 
87  if (m_options.reliabilityOptions.isEnabled) {
88  m_reliability.piggyback(pkt, mtu);
89  }
90 
91  if (m_options.allowCongestionMarking) {
92  checkCongestionLevel(pkt);
93  }
94 
95  Transport::Packet tp(pkt.wireEncode());
96  if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
97  ++this->nOutOverMtu;
98  NFD_LOG_FACE_WARN("attempted to send packet over MTU limit");
99  return;
100  }
101  this->sendPacket(std::move(tp));
102 }
103 
104 void
105 GenericLinkService::doSendInterest(const Interest& interest)
106 {
107  lp::Packet lpPacket(interest.wireEncode());
108 
109  encodeLpFields(interest, lpPacket);
110 
111  this->sendNetPacket(std::move(lpPacket), true);
112 }
113 
114 void
115 GenericLinkService::doSendData(const Data& data)
116 {
117  lp::Packet lpPacket(data.wireEncode());
118 
119  encodeLpFields(data, lpPacket);
120 
121  this->sendNetPacket(std::move(lpPacket), false);
122 }
123 
124 void
125 GenericLinkService::doSendNack(const lp::Nack& nack)
126 {
127  lp::Packet lpPacket(nack.getInterest().wireEncode());
128  lpPacket.add<lp::NackField>(nack.getHeader());
129 
130  encodeLpFields(nack, lpPacket);
131 
132  this->sendNetPacket(std::move(lpPacket), false);
133 }
134 
135 void
136 GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
137 {
138  if (m_options.allowLocalFields) {
139  shared_ptr<lp::IncomingFaceIdTag> incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
140  if (incomingFaceIdTag != nullptr) {
141  lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
142  }
143  }
144 
145  shared_ptr<lp::CongestionMarkTag> congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
146  if (congestionMarkTag != nullptr) {
147  lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
148  }
149 
150  if (m_options.allowSelfLearning) {
151  shared_ptr<lp::NonDiscoveryTag> nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
152  if (nonDiscoveryTag != nullptr) {
153  lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
154  }
155 
156  shared_ptr<lp::PrefixAnnouncementTag> prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
157  if (prefixAnnouncementTag != nullptr) {
158  lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
159  }
160  }
161 }
162 
163 void
164 GenericLinkService::sendNetPacket(lp::Packet&& pkt, bool isInterest)
165 {
166  std::vector<lp::Packet> frags;
167  ssize_t mtu = this->getTransport()->getMtu();
168 
169  // Make space for feature fields in fragments
170  if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
172  }
173 
174  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
175  mtu -= CONGESTION_MARK_SIZE;
176  }
177 
178  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu > 0);
179 
180  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
181  bool isOk = false;
182  std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
183  if (!isOk) {
184  // fragmentation failed (warning is logged by LpFragmenter)
185  ++this->nFragmentationErrors;
186  return;
187  }
188  }
189  else {
190  if (m_options.reliabilityOptions.isEnabled) {
191  frags.push_back(pkt);
192  }
193  else {
194  frags.push_back(std::move(pkt));
195  }
196  }
197 
198  if (frags.size() == 1) {
199  // even if indexed fragmentation is enabled, the fragmenter should not
200  // fragment the packet if it can fit in MTU
201  BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
202  BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
203  }
204 
205  // Only assign sequences to fragments if packet contains more than 1 fragment
206  if (frags.size() > 1) {
207  // Assign sequences to all fragments
208  this->assignSequences(frags);
209  }
210 
211  if (m_options.reliabilityOptions.isEnabled && frags.front().has<lp::FragmentField>()) {
212  m_reliability.handleOutgoing(frags, std::move(pkt), isInterest);
213  }
214 
215  for (lp::Packet& frag : frags) {
216  this->sendLpPacket(std::move(frag));
217  }
218 }
219 
220 void
221 GenericLinkService::assignSequence(lp::Packet& pkt)
222 {
223  pkt.set<lp::SequenceField>(++m_lastSeqNo);
224 }
225 
226 void
227 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
228 {
229  std::for_each(pkts.begin(), pkts.end(), bind(&GenericLinkService::assignSequence, this, _1));
230 }
231 
232 void
233 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
234 {
235  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
236  // This operation requires that the transport supports retrieving current send queue length
237  if (sendQueueLength < 0) {
238  return;
239  }
240 
241  // To avoid overflowing the queue, set the congestion threshold to at least half of the send
242  // queue capacity.
243  size_t congestionThreshold = m_options.defaultCongestionThreshold;
244  if (getTransport()->getSendQueueCapacity() >= 0) {
245  congestionThreshold = std::min(congestionThreshold,
246  static_cast<size_t>(getTransport()->getSendQueueCapacity()) /
247  DEFAULT_CONGESTION_THRESHOLD_DIVISOR);
248  }
249 
250  if (sendQueueLength > 0) {
251  NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" << congestionThreshold <<
252  " capacity=" << getTransport()->getSendQueueCapacity());
253  }
254 
255  if (static_cast<size_t>(sendQueueLength) > congestionThreshold) { // Send queue is congested
256  const auto now = time::steady_clock::now();
257  if (now >= m_nextMarkTime || now >= m_lastMarkTime + m_options.baseCongestionMarkingInterval) {
258  // Mark at most one initial packet per baseCongestionMarkingInterval
259  if (m_nMarkedSinceInMarkingState == 0) {
260  m_nextMarkTime = now;
261  }
262 
263  // Time to mark packet
264  pkt.set<lp::CongestionMarkField>(1);
266  NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
267 
268  ++m_nMarkedSinceInMarkingState;
269  // Decrease the marking interval by the inverse of the square root of the number of packets
270  // marked in this incident of congestion
271  m_nextMarkTime += time::nanoseconds(static_cast<time::nanoseconds::rep>(
272  m_options.baseCongestionMarkingInterval.count() /
273  std::sqrt(m_nMarkedSinceInMarkingState)));
274  m_lastMarkTime = now;
275  }
276  }
277  else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
278  // Congestion incident has ended, so reset
279  NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
280  m_nextMarkTime = time::steady_clock::TimePoint::max();
281  m_nMarkedSinceInMarkingState = 0;
282  }
283 }
284 
285 void
286 GenericLinkService::doReceivePacket(Transport::Packet&& packet)
287 {
288  try {
289  lp::Packet pkt(packet.packet);
290 
291  if (m_options.reliabilityOptions.isEnabled) {
292  m_reliability.processIncomingPacket(pkt);
293  }
294 
295  if (!pkt.has<lp::FragmentField>()) {
296  NFD_LOG_FACE_TRACE("received IDLE packet: DROP");
297  return;
298  }
299 
300  if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
301  !m_options.allowReassembly) {
302  NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
303  return;
304  }
305 
306  bool isReassembled = false;
307  Block netPkt;
308  lp::Packet firstPkt;
309  std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(packet.remoteEndpoint,
310  pkt);
311  if (isReassembled) {
312  this->decodeNetPacket(netPkt, firstPkt);
313  }
314  }
315  catch (const tlv::Error& e) {
316  ++this->nInLpInvalid;
317  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
318  }
319 }
320 
321 void
322 GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt)
323 {
324  try {
325  switch (netPkt.type()) {
326  case tlv::Interest:
327  if (firstPkt.has<lp::NackField>()) {
328  this->decodeNack(netPkt, firstPkt);
329  }
330  else {
331  this->decodeInterest(netPkt, firstPkt);
332  }
333  break;
334  case tlv::Data:
335  this->decodeData(netPkt, firstPkt);
336  break;
337  default:
338  ++this->nInNetInvalid;
339  NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
340  return;
341  }
342  }
343  catch (const tlv::Error& e) {
344  ++this->nInNetInvalid;
345  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
346  }
347 }
348 
349 void
350 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt)
351 {
352  BOOST_ASSERT(netPkt.type() == tlv::Interest);
353  BOOST_ASSERT(!firstPkt.has<lp::NackField>());
354 
355  // forwarding expects Interest to be created with make_shared
356  auto interest = make_shared<Interest>(netPkt);
357 
358  if (firstPkt.has<lp::NextHopFaceIdField>()) {
359  if (m_options.allowLocalFields) {
360  interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
361  }
362  else {
363  NFD_LOG_FACE_WARN("received NextHopFaceId, but local fields disabled: DROP");
364  return;
365  }
366  }
367 
368  if (firstPkt.has<lp::CachePolicyField>()) {
369  ++this->nInNetInvalid;
370  NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
371  return;
372  }
373 
374  if (firstPkt.has<lp::IncomingFaceIdField>()) {
375  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
376  }
377 
378  if (firstPkt.has<lp::CongestionMarkField>()) {
379  interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
380  }
381 
382  if (firstPkt.has<lp::NonDiscoveryField>()) {
383  if (m_options.allowSelfLearning) {
384  interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
385  }
386  else {
387  NFD_LOG_FACE_WARN("received NonDiscovery, but self-learning disabled: IGNORE");
388  }
389  }
390 
391  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
392  ++this->nInNetInvalid;
393  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Interest: DROP");
394  return;
395  }
396 
397  this->receiveInterest(*interest);
398 }
399 
400 void
401 GenericLinkService::decodeData(const Block& netPkt, const lp::Packet& firstPkt)
402 {
403  BOOST_ASSERT(netPkt.type() == tlv::Data);
404 
405  // forwarding expects Data to be created with make_shared
406  auto data = make_shared<Data>(netPkt);
407 
408  if (firstPkt.has<lp::NackField>()) {
409  ++this->nInNetInvalid;
410  NFD_LOG_FACE_WARN("received Nack with Data: DROP");
411  return;
412  }
413 
414  if (firstPkt.has<lp::NextHopFaceIdField>()) {
415  ++this->nInNetInvalid;
416  NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
417  return;
418  }
419 
420  if (firstPkt.has<lp::CachePolicyField>()) {
421  // CachePolicy is unprivileged and does not require allowLocalFields option.
422  // In case of an invalid CachePolicyType, get<lp::CachePolicyField> will throw,
423  // so it's unnecessary to check here.
424  data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
425  }
426 
427  if (firstPkt.has<lp::IncomingFaceIdField>()) {
428  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
429  }
430 
431  if (firstPkt.has<lp::CongestionMarkField>()) {
432  data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
433  }
434 
435  if (firstPkt.has<lp::NonDiscoveryField>()) {
436  ++this->nInNetInvalid;
437  NFD_LOG_FACE_WARN("received NonDiscovery with Data: DROP");
438  return;
439  }
440 
441  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
442  if (m_options.allowSelfLearning) {
443  data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
444  }
445  else {
446  NFD_LOG_FACE_WARN("received PrefixAnnouncement, but self-learning disabled: IGNORE");
447  }
448  }
449 
450  this->receiveData(*data);
451 }
452 
453 void
454 GenericLinkService::decodeNack(const Block& netPkt, const lp::Packet& firstPkt)
455 {
456  BOOST_ASSERT(netPkt.type() == tlv::Interest);
457  BOOST_ASSERT(firstPkt.has<lp::NackField>());
458 
459  lp::Nack nack((Interest(netPkt)));
460  nack.setHeader(firstPkt.get<lp::NackField>());
461 
462  if (firstPkt.has<lp::NextHopFaceIdField>()) {
463  ++this->nInNetInvalid;
464  NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
465  return;
466  }
467 
468  if (firstPkt.has<lp::CachePolicyField>()) {
469  ++this->nInNetInvalid;
470  NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
471  return;
472  }
473 
474  if (firstPkt.has<lp::IncomingFaceIdField>()) {
475  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
476  }
477 
478  if (firstPkt.has<lp::CongestionMarkField>()) {
479  nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
480  }
481 
482  if (firstPkt.has<lp::NonDiscoveryField>()) {
483  ++this->nInNetInvalid;
484  NFD_LOG_FACE_WARN("received NonDiscovery with Nack: DROP");
485  return;
486  }
487 
488  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
489  ++this->nInNetInvalid;
490  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Nack: DROP");
491  return;
492  }
493 
494  this->receiveNack(nack);
495 }
496 
497 } // namespace face
498 } // namespace nfd
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Definition: face-log.hpp:79
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:96
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
Definition: face-log.hpp:82
ssize_t getMtu() const
Definition: transport.hpp:459
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
bool isEnabled
enables link-layer reliability
void setOptions(const Options &options)
set options for fragmenter
stores a packet along with the remote endpoint
Definition: transport.hpp:122
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
Definition: face-log.hpp:88
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TxSequence TLV-LENGTH (1 octet) + sizeof(lp::Sequence) ...
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
signal::Signal< LpReassembler, Transport::EndpointId, size_t > beforeTimeout
signals before a partial packet is dropped due to timeout
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
fragments a network-layer packet into link-layer packets
constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR
virtual ssize_t getSendQueueLength()
Definition: transport.cpp:130
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
void setOptions(const Options &options)
set options for reliability
#define NFD_LOG_INIT(name)
Definition: logger.hpp:122
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
std::tuple< bool, Block, lp::Packet > receiveFragment(Transport::EndpointId remoteEndpoint, const lp::Packet &packet)
adds received fragment to buffer
void setOptions(const Options &options)
set options for reassembler