generic-link-service.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "generic-link-service.hpp"
27 
28 #include <ndn-cxx/lp/pit-token.hpp>
29 #include <ndn-cxx/lp/tags.hpp>
30 
31 #include <cmath>
32 
33 namespace nfd::face {
34 
35 NFD_LOG_INIT(GenericLinkService);
36 
37 constexpr size_t CONGESTION_MARK_SIZE = tlv::sizeOfVarNumber(lp::tlv::CongestionMark) + // type
38  tlv::sizeOfVarNumber(sizeof(uint64_t)) + // length
39  tlv::sizeOfNonNegativeInteger(UINT64_MAX); // value
40 
42  : m_options(options)
43  , m_fragmenter(m_options.fragmenterOptions, this)
44  , m_reassembler(m_options.reassemblerOptions, this)
45  , m_reliability(m_options.reliabilityOptions, this)
46  , m_lastSeqNo(-2)
47  , m_nextMarkTime(time::steady_clock::time_point::max())
48  , m_nMarkedSinceInMarkingState(0)
49 {
50  m_reassembler.beforeTimeout.connect([this] (auto&&...) { ++nReassemblyTimeouts; });
51  m_reliability.onDroppedInterest.connect([this] (const auto& i) { notifyDroppedInterest(i); });
52  nReassembling.observe(&m_reassembler);
53 }
54 
55 void
57 {
58  m_options = options;
59  m_fragmenter.setOptions(m_options.fragmenterOptions);
60  m_reassembler.setOptions(m_options.reassemblerOptions);
61  m_reliability.setOptions(m_options.reliabilityOptions);
62 }
63 
64 ssize_t
66 {
67  // Since MTU_UNLIMITED is negative, it will implicitly override any finite override MTU
68  return std::min(m_options.overrideMtu, getTransport()->getMtu());
69 }
70 
71 bool
73 {
74  // Not allowed to override unlimited transport MTU
75  if (getTransport()->getMtu() == MTU_UNLIMITED) {
76  return false;
77  }
78 
79  // Override MTU must be at least MIN_MTU (also implicitly forbids MTU_UNLIMITED and MTU_INVALID)
80  return mtu >= MIN_MTU;
81 }
82 
83 void
84 GenericLinkService::requestIdlePacket()
85 {
86  // No need to request Acks to attach to this packet from LpReliability, as they are already
87  // attached in sendLpPacket
88  NFD_LOG_FACE_TRACE("IDLE packet requested");
89  this->sendLpPacket({});
90 }
91 
92 void
93 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
94 {
95  const ssize_t mtu = getEffectiveMtu();
96 
97  if (m_options.reliabilityOptions.isEnabled) {
98  m_reliability.piggyback(pkt, mtu);
99  }
100 
101  if (m_options.allowCongestionMarking) {
102  checkCongestionLevel(pkt);
103  }
104 
105  auto block = pkt.wireEncode();
106  if (mtu != MTU_UNLIMITED && block.size() > static_cast<size_t>(mtu)) {
107  ++nOutOverMtu;
108  NFD_LOG_FACE_WARN("attempted to send packet over MTU limit");
109  return;
110  }
111  this->sendPacket(block);
112 }
113 
114 void
115 GenericLinkService::doSendInterest(const Interest& interest)
116 {
117  lp::Packet lpPacket(interest.wireEncode());
118 
119  encodeLpFields(interest, lpPacket);
120 
121  this->sendNetPacket(std::move(lpPacket), true);
122 }
123 
124 void
125 GenericLinkService::doSendData(const Data& data)
126 {
127  lp::Packet lpPacket(data.wireEncode());
128 
129  encodeLpFields(data, lpPacket);
130 
131  this->sendNetPacket(std::move(lpPacket), false);
132 }
133 
134 void
135 GenericLinkService::doSendNack(const lp::Nack& nack)
136 {
137  lp::Packet lpPacket(nack.getInterest().wireEncode());
138  lpPacket.add<lp::NackField>(nack.getHeader());
139 
140  encodeLpFields(nack, lpPacket);
141 
142  this->sendNetPacket(std::move(lpPacket), false);
143 }
144 
145 void
146 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
147 {
148  std::for_each(pkts.begin(), pkts.end(), [this] (lp::Packet& pkt) {
149  pkt.set<lp::SequenceField>(++m_lastSeqNo);
150  });
151 }
152 
153 void
154 GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
155 {
156  if (m_options.allowLocalFields) {
157  auto incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
158  if (incomingFaceIdTag != nullptr) {
159  lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
160  }
161  }
162 
163  auto congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
164  if (congestionMarkTag != nullptr) {
165  lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
166  }
167 
168  if (m_options.allowSelfLearning) {
169  auto nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
170  if (nonDiscoveryTag != nullptr) {
171  lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
172  }
173 
174  auto prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
175  if (prefixAnnouncementTag != nullptr) {
176  lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
177  }
178  }
179 
180  auto pitToken = netPkt.getTag<lp::PitToken>();
181  if (pitToken != nullptr) {
182  lpPacket.add<lp::PitTokenField>(*pitToken);
183  }
184 }
185 
186 void
187 GenericLinkService::sendNetPacket(lp::Packet&& pkt, bool isInterest)
188 {
189  std::vector<lp::Packet> frags;
190  ssize_t mtu = getEffectiveMtu();
191 
192  // Make space for feature fields in fragments
193  if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
195  }
196 
197  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
198  mtu -= CONGESTION_MARK_SIZE;
199  }
200 
201  // An MTU of 0 is allowed but will cause all packets to be dropped before transmission
202  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu >= 0);
203 
204  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
205  bool isOk = false;
206  std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
207  if (!isOk) {
208  // fragmentation failed (warning is logged by LpFragmenter)
210  return;
211  }
212  }
213  else {
214  if (m_options.reliabilityOptions.isEnabled) {
215  frags.push_back(pkt);
216  }
217  else {
218  frags.push_back(std::move(pkt));
219  }
220  }
221 
222  if (frags.size() == 1) {
223  // even if indexed fragmentation is enabled, the fragmenter should not
224  // fragment the packet if it can fit in MTU
225  BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
226  BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
227  }
228 
229  // Only assign sequences to fragments if reliability enabled or if packet contains >1 fragment
230  if (m_options.reliabilityOptions.isEnabled || frags.size() > 1) {
231  // Assign sequences to all fragments
232  this->assignSequences(frags);
233  }
234 
235  if (m_options.reliabilityOptions.isEnabled && frags.front().has<lp::FragmentField>()) {
236  m_reliability.handleOutgoing(frags, std::move(pkt), isInterest);
237  }
238 
239  for (lp::Packet& frag : frags) {
240  this->sendLpPacket(std::move(frag));
241  }
242 }
243 
244 void
245 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
246 {
247  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
248  // The transport must support retrieving the current send queue length
249  if (sendQueueLength < 0) {
250  return;
251  }
252 
253  if (sendQueueLength > 0) {
254  NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" <<
255  m_options.defaultCongestionThreshold << " capacity=" <<
256  getTransport()->getSendQueueCapacity());
257  }
258 
259  // sendQueue is above target
260  if (static_cast<size_t>(sendQueueLength) > m_options.defaultCongestionThreshold) {
261  const auto now = time::steady_clock::now();
262 
263  if (m_nextMarkTime == time::steady_clock::time_point::max()) {
264  m_nextMarkTime = now + m_options.baseCongestionMarkingInterval;
265  }
266  // Mark packet if sendQueue stays above target for one interval
267  else if (now >= m_nextMarkTime) {
268  pkt.set<lp::CongestionMarkField>(1);
270  NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
271 
272  ++m_nMarkedSinceInMarkingState;
273  // Decrease the marking interval by the inverse of the square root of the number of packets
274  // marked in this incident of congestion
275  time::nanoseconds interval(static_cast<time::nanoseconds::rep>(
276  m_options.baseCongestionMarkingInterval.count() /
277  std::sqrt(m_nMarkedSinceInMarkingState + 1)));
278  m_nextMarkTime += interval;
279  }
280  }
281  else if (m_nextMarkTime != time::steady_clock::time_point::max()) {
282  // Congestion incident has ended, so reset
283  NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
284  m_nextMarkTime = time::steady_clock::time_point::max();
285  m_nMarkedSinceInMarkingState = 0;
286  }
287 }
288 
289 void
290 GenericLinkService::doReceivePacket(const Block& packet, const EndpointId& endpoint)
291 {
292  try {
293  lp::Packet pkt(packet);
294 
295  if (m_options.reliabilityOptions.isEnabled) {
296  if (!m_reliability.processIncomingPacket(pkt)) {
297  NFD_LOG_FACE_TRACE("received duplicate fragment: DROP");
299  return;
300  }
301  }
302 
303  if (!pkt.has<lp::FragmentField>()) {
304  NFD_LOG_FACE_TRACE("received IDLE packet: DROP");
305  return;
306  }
307 
308  if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
309  !m_options.allowReassembly) {
310  NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
311  return;
312  }
313 
314  auto [isReassembled, netPkt, firstPkt] = m_reassembler.receiveFragment(endpoint, pkt);
315  if (isReassembled) {
316  this->decodeNetPacket(netPkt, firstPkt, endpoint);
317  }
318  }
319  catch (const tlv::Error& e) {
320  ++nInLpInvalid;
321  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
322  }
323 }
324 
325 void
326 GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt,
327  const EndpointId& endpointId)
328 {
329  try {
330  switch (netPkt.type()) {
331  case tlv::Interest:
332  if (firstPkt.has<lp::NackField>()) {
333  this->decodeNack(netPkt, firstPkt, endpointId);
334  }
335  else {
336  this->decodeInterest(netPkt, firstPkt, endpointId);
337  }
338  break;
339  case tlv::Data:
340  this->decodeData(netPkt, firstPkt, endpointId);
341  break;
342  default:
343  ++nInNetInvalid;
344  NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
345  return;
346  }
347  }
348  catch (const tlv::Error& e) {
349  ++nInNetInvalid;
350  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
351  }
352 }
353 
354 void
355 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt,
356  const EndpointId& endpointId)
357 {
358  BOOST_ASSERT(netPkt.type() == tlv::Interest);
359  BOOST_ASSERT(!firstPkt.has<lp::NackField>());
360 
361  // forwarding expects Interest to be created with make_shared
362  auto interest = make_shared<Interest>(netPkt);
363 
364  if (firstPkt.has<lp::NextHopFaceIdField>()) {
365  if (m_options.allowLocalFields) {
366  interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
367  }
368  else {
369  NFD_LOG_FACE_WARN("received NextHopFaceId, but local fields disabled: DROP");
370  return;
371  }
372  }
373 
374  if (firstPkt.has<lp::CachePolicyField>()) {
375  ++nInNetInvalid;
376  NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
377  return;
378  }
379 
380  if (firstPkt.has<lp::IncomingFaceIdField>()) {
381  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
382  }
383 
384  if (firstPkt.has<lp::CongestionMarkField>()) {
385  interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
386  }
387 
388  if (firstPkt.has<lp::NonDiscoveryField>()) {
389  if (m_options.allowSelfLearning) {
390  interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
391  }
392  else {
393  NFD_LOG_FACE_WARN("received NonDiscovery, but self-learning disabled: IGNORE");
394  }
395  }
396 
397  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
398  ++nInNetInvalid;
399  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Interest: DROP");
400  return;
401  }
402 
403  if (firstPkt.has<lp::PitTokenField>()) {
404  interest->setTag(make_shared<lp::PitToken>(firstPkt.get<lp::PitTokenField>()));
405  }
406 
407  this->receiveInterest(*interest, endpointId);
408 }
409 
410 void
411 GenericLinkService::decodeData(const Block& netPkt, const lp::Packet& firstPkt,
412  const EndpointId& endpointId)
413 {
414  BOOST_ASSERT(netPkt.type() == tlv::Data);
415 
416  // forwarding expects Data to be created with make_shared
417  auto data = make_shared<Data>(netPkt);
418 
419  if (firstPkt.has<lp::NackField>()) {
420  ++nInNetInvalid;
421  NFD_LOG_FACE_WARN("received Nack with Data: DROP");
422  return;
423  }
424 
425  if (firstPkt.has<lp::NextHopFaceIdField>()) {
426  ++nInNetInvalid;
427  NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
428  return;
429  }
430 
431  if (firstPkt.has<lp::CachePolicyField>()) {
432  // CachePolicy is unprivileged and does not require allowLocalFields option.
433  // In case of an invalid CachePolicyType, get<lp::CachePolicyField> will throw,
434  // so it's unnecessary to check here.
435  data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
436  }
437 
438  if (firstPkt.has<lp::IncomingFaceIdField>()) {
439  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
440  }
441 
442  if (firstPkt.has<lp::CongestionMarkField>()) {
443  data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
444  }
445 
446  if (firstPkt.has<lp::NonDiscoveryField>()) {
447  ++nInNetInvalid;
448  NFD_LOG_FACE_WARN("received NonDiscovery with Data: DROP");
449  return;
450  }
451 
452  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
453  if (m_options.allowSelfLearning) {
454  data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
455  }
456  else {
457  NFD_LOG_FACE_WARN("received PrefixAnnouncement, but self-learning disabled: IGNORE");
458  }
459  }
460 
461  this->receiveData(*data, endpointId);
462 }
463 
464 void
465 GenericLinkService::decodeNack(const Block& netPkt, const lp::Packet& firstPkt,
466  const EndpointId& endpointId)
467 {
468  BOOST_ASSERT(netPkt.type() == tlv::Interest);
469  BOOST_ASSERT(firstPkt.has<lp::NackField>());
470 
471  lp::Nack nack((Interest(netPkt)));
472  nack.setHeader(firstPkt.get<lp::NackField>());
473 
474  if (firstPkt.has<lp::NextHopFaceIdField>()) {
475  ++nInNetInvalid;
476  NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
477  return;
478  }
479 
480  if (firstPkt.has<lp::CachePolicyField>()) {
481  ++nInNetInvalid;
482  NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
483  return;
484  }
485 
486  if (firstPkt.has<lp::IncomingFaceIdField>()) {
487  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
488  }
489 
490  if (firstPkt.has<lp::CongestionMarkField>()) {
491  nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
492  }
493 
494  if (firstPkt.has<lp::NonDiscoveryField>()) {
495  ++nInNetInvalid;
496  NFD_LOG_FACE_WARN("received NonDiscovery with Nack: DROP");
497  return;
498  }
499 
500  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
501  ++nInNetInvalid;
502  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Nack: DROP");
503  return;
504  }
505 
506  this->receiveNack(nack, endpointId);
507 }
508 
509 } // namespace nfd::face
void setOptions(const Options &options)
Set options for fragmenter.
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
Fragments a network-layer packet into link-layer packets.
std::tuple< bool, Block, lp::Packet > receiveFragment(const EndpointId &remoteEndpoint, const lp::Packet &packet)
Adds received fragment to the buffer.
signal::Signal< LpReassembler, EndpointId, size_t > beforeTimeout
Notifies before a partial packet is dropped due to timeout.
void setOptions(const Options &options)
Set options for reassembler.
signal::Signal< LpReliability, Interest > onDroppedInterest
Signals on Interest dropped by reliability system for exceeding allowed number of retx.
void piggyback(lp::Packet &pkt, ssize_t mtu)
Called by GenericLinkService to attach Acks onto an outgoing LpPacket.
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
Observe outgoing fragment(s) of a network packet and store for potential retransmission.
bool processIncomingPacket(const lp::Packet &pkt)
Extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue.
void setOptions(const Options &options)
Set options for reliability.
virtual ssize_t getSendQueueLength()
Returns the current send queue length of the transport (in octets).
Definition: transport.hpp:317
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
std::variant< std::monostate, ethernet::Address, udp::Endpoint > EndpointId
Identifies a remote endpoint on the link.
Definition: face-common.hpp:77
constexpr size_t CONGESTION_MARK_SIZE
constexpr ssize_t MIN_MTU
Minimum MTU that may be set.
Definition: face-common.hpp:65
constexpr ssize_t MTU_UNLIMITED
Indicates that the transport has no limit on payload size.
Definition: transport.hpp:92
bool isEnabled
Enables link-layer reliability.