generic-link-service.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "generic-link-service.hpp"
27 
28 #include <ndn-cxx/lp/pit-token.hpp>
29 #include <ndn-cxx/lp/tags.hpp>
30 
31 #include <cmath>
32 
33 namespace nfd {
34 namespace face {
35 
36 NFD_LOG_INIT(GenericLinkService);
37 
38 constexpr size_t CONGESTION_MARK_SIZE = tlv::sizeOfVarNumber(lp::tlv::CongestionMark) + // type
39  tlv::sizeOfVarNumber(sizeof(uint64_t)) + // length
40  tlv::sizeOfNonNegativeInteger(UINT64_MAX); // value
41 
43  : m_options(options)
44  , m_fragmenter(m_options.fragmenterOptions, this)
45  , m_reassembler(m_options.reassemblerOptions, this)
46  , m_reliability(m_options.reliabilityOptions, this)
47  , m_lastSeqNo(-2)
48  , m_nextMarkTime(time::steady_clock::TimePoint::max())
49  , m_nMarkedSinceInMarkingState(0)
50 {
51  m_reassembler.beforeTimeout.connect([this] (auto...) { ++this->nReassemblyTimeouts; });
52  m_reliability.onDroppedInterest.connect([this] (const auto& i) { this->notifyDroppedInterest(i); });
53  nReassembling.observe(&m_reassembler);
54 }
55 
56 void
58 {
59  m_options = options;
60  m_fragmenter.setOptions(m_options.fragmenterOptions);
61  m_reassembler.setOptions(m_options.reassemblerOptions);
62  m_reliability.setOptions(m_options.reliabilityOptions);
63 }
64 
65 void
66 GenericLinkService::requestIdlePacket(const EndpointId& endpointId)
67 {
68  // No need to request Acks to attach to this packet from LpReliability, as they are already
69  // attached in sendLpPacket
70  this->sendLpPacket({}, endpointId);
71 }
72 
73 void
74 GenericLinkService::sendLpPacket(lp::Packet&& pkt, const EndpointId& endpointId)
75 {
76  const ssize_t mtu = this->getTransport()->getMtu();
77 
78  if (m_options.reliabilityOptions.isEnabled) {
79  m_reliability.piggyback(pkt, mtu);
80  }
81 
82  if (m_options.allowCongestionMarking) {
83  checkCongestionLevel(pkt);
84  }
85 
86  auto block = pkt.wireEncode();
87  if (mtu != MTU_UNLIMITED && block.size() > static_cast<size_t>(mtu)) {
88  ++this->nOutOverMtu;
89  NFD_LOG_FACE_WARN("attempted to send packet over MTU limit");
90  return;
91  }
92  this->sendPacket(block, endpointId);
93 }
94 
95 void
96 GenericLinkService::doSendInterest(const Interest& interest, const EndpointId& endpointId)
97 {
98  lp::Packet lpPacket(interest.wireEncode());
99 
100  encodeLpFields(interest, lpPacket);
101 
102  this->sendNetPacket(std::move(lpPacket), endpointId, true);
103 }
104 
105 void
106 GenericLinkService::doSendData(const Data& data, const EndpointId& endpointId)
107 {
108  lp::Packet lpPacket(data.wireEncode());
109 
110  encodeLpFields(data, lpPacket);
111 
112  this->sendNetPacket(std::move(lpPacket), endpointId, false);
113 }
114 
115 void
116 GenericLinkService::doSendNack(const lp::Nack& nack, const EndpointId& endpointId)
117 {
118  lp::Packet lpPacket(nack.getInterest().wireEncode());
119  lpPacket.add<lp::NackField>(nack.getHeader());
120 
121  encodeLpFields(nack, lpPacket);
122 
123  this->sendNetPacket(std::move(lpPacket), endpointId, false);
124 }
125 
126 void
127 GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
128 {
129  if (m_options.allowLocalFields) {
130  auto incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
131  if (incomingFaceIdTag != nullptr) {
132  lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
133  }
134  }
135 
136  auto congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
137  if (congestionMarkTag != nullptr) {
138  lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
139  }
140 
141  if (m_options.allowSelfLearning) {
142  auto nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
143  if (nonDiscoveryTag != nullptr) {
144  lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
145  }
146 
147  auto prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
148  if (prefixAnnouncementTag != nullptr) {
149  lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
150  }
151  }
152 
153  auto pitToken = netPkt.getTag<lp::PitToken>();
154  if (pitToken != nullptr) {
155  lpPacket.add<lp::PitTokenField>(*pitToken);
156  }
157 }
158 
159 void
160 GenericLinkService::sendNetPacket(lp::Packet&& pkt, const EndpointId& endpointId, bool isInterest)
161 {
162  std::vector<lp::Packet> frags;
163  ssize_t mtu = this->getTransport()->getMtu();
164 
165  // Make space for feature fields in fragments
166  if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
168  }
169 
170  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
171  mtu -= CONGESTION_MARK_SIZE;
172  }
173 
174  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu > 0);
175 
176  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
177  bool isOk = false;
178  std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
179  if (!isOk) {
180  // fragmentation failed (warning is logged by LpFragmenter)
181  ++this->nFragmentationErrors;
182  return;
183  }
184  }
185  else {
186  if (m_options.reliabilityOptions.isEnabled) {
187  frags.push_back(pkt);
188  }
189  else {
190  frags.push_back(std::move(pkt));
191  }
192  }
193 
194  if (frags.size() == 1) {
195  // even if indexed fragmentation is enabled, the fragmenter should not
196  // fragment the packet if it can fit in MTU
197  BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
198  BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
199  }
200 
201  // Only assign sequences to fragments if packet contains more than 1 fragment
202  if (frags.size() > 1) {
203  // Assign sequences to all fragments
204  this->assignSequences(frags);
205  }
206 
207  if (m_options.reliabilityOptions.isEnabled && frags.front().has<lp::FragmentField>()) {
208  m_reliability.handleOutgoing(frags, std::move(pkt), isInterest);
209  }
210 
211  for (lp::Packet& frag : frags) {
212  this->sendLpPacket(std::move(frag), endpointId);
213  }
214 }
215 
216 void
217 GenericLinkService::assignSequence(lp::Packet& pkt)
218 {
219  pkt.set<lp::SequenceField>(++m_lastSeqNo);
220 }
221 
222 void
223 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
224 {
225  std::for_each(pkts.begin(), pkts.end(), [this] (auto& pkt) { this->assignSequence(pkt); });
226 }
227 
228 void
229 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
230 {
231  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
232  // The transport must support retrieving the current send queue length
233  if (sendQueueLength < 0) {
234  return;
235  }
236 
237  if (sendQueueLength > 0) {
238  NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" <<
239  m_options.defaultCongestionThreshold << " capacity=" <<
240  getTransport()->getSendQueueCapacity());
241  }
242 
243  // sendQueue is above target
244  if (static_cast<size_t>(sendQueueLength) > m_options.defaultCongestionThreshold) {
245  const auto now = time::steady_clock::now();
246 
247  if (m_nextMarkTime == time::steady_clock::TimePoint::max()) {
248  m_nextMarkTime = now + m_options.baseCongestionMarkingInterval;
249  }
250  // Mark packet if sendQueue stays above target for one interval
251  else if (now >= m_nextMarkTime) {
252  pkt.set<lp::CongestionMarkField>(1);
254  NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
255 
256  ++m_nMarkedSinceInMarkingState;
257  // Decrease the marking interval by the inverse of the square root of the number of packets
258  // marked in this incident of congestion
259  time::nanoseconds interval(static_cast<time::nanoseconds::rep>(
260  m_options.baseCongestionMarkingInterval.count() /
261  std::sqrt(m_nMarkedSinceInMarkingState + 1)));
262  m_nextMarkTime += interval;
263  }
264  }
265  else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
266  // Congestion incident has ended, so reset
267  NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
268  m_nextMarkTime = time::steady_clock::TimePoint::max();
269  m_nMarkedSinceInMarkingState = 0;
270  }
271 }
272 
273 void
274 GenericLinkService::doReceivePacket(const Block& packet, const EndpointId& endpoint)
275 {
276  try {
277  lp::Packet pkt(packet);
278 
279  if (m_options.reliabilityOptions.isEnabled) {
280  m_reliability.processIncomingPacket(pkt);
281  }
282 
283  if (!pkt.has<lp::FragmentField>()) {
284  NFD_LOG_FACE_TRACE("received IDLE packet: DROP");
285  return;
286  }
287 
288  if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
289  !m_options.allowReassembly) {
290  NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
291  return;
292  }
293 
294  bool isReassembled = false;
295  Block netPkt;
296  lp::Packet firstPkt;
297  std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(endpoint, pkt);
298  if (isReassembled) {
299  this->decodeNetPacket(netPkt, firstPkt, endpoint);
300  }
301  }
302  catch (const tlv::Error& e) {
303  ++this->nInLpInvalid;
304  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
305  }
306 }
307 
308 void
309 GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt,
310  const EndpointId& endpointId)
311 {
312  try {
313  switch (netPkt.type()) {
314  case tlv::Interest:
315  if (firstPkt.has<lp::NackField>()) {
316  this->decodeNack(netPkt, firstPkt, endpointId);
317  }
318  else {
319  this->decodeInterest(netPkt, firstPkt, endpointId);
320  }
321  break;
322  case tlv::Data:
323  this->decodeData(netPkt, firstPkt, endpointId);
324  break;
325  default:
326  ++this->nInNetInvalid;
327  NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
328  return;
329  }
330  }
331  catch (const tlv::Error& e) {
332  ++this->nInNetInvalid;
333  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
334  }
335 }
336 
337 void
338 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt,
339  const EndpointId& endpointId)
340 {
341  BOOST_ASSERT(netPkt.type() == tlv::Interest);
342  BOOST_ASSERT(!firstPkt.has<lp::NackField>());
343 
344  // forwarding expects Interest to be created with make_shared
345  auto interest = make_shared<Interest>(netPkt);
346 
347  if (firstPkt.has<lp::NextHopFaceIdField>()) {
348  if (m_options.allowLocalFields) {
349  interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
350  }
351  else {
352  NFD_LOG_FACE_WARN("received NextHopFaceId, but local fields disabled: DROP");
353  return;
354  }
355  }
356 
357  if (firstPkt.has<lp::CachePolicyField>()) {
358  ++this->nInNetInvalid;
359  NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
360  return;
361  }
362 
363  if (firstPkt.has<lp::IncomingFaceIdField>()) {
364  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
365  }
366 
367  if (firstPkt.has<lp::CongestionMarkField>()) {
368  interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
369  }
370 
371  if (firstPkt.has<lp::NonDiscoveryField>()) {
372  if (m_options.allowSelfLearning) {
373  interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
374  }
375  else {
376  NFD_LOG_FACE_WARN("received NonDiscovery, but self-learning disabled: IGNORE");
377  }
378  }
379 
380  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
381  ++this->nInNetInvalid;
382  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Interest: DROP");
383  return;
384  }
385 
386  if (firstPkt.has<lp::PitTokenField>()) {
387  interest->setTag(make_shared<lp::PitToken>(firstPkt.get<lp::PitTokenField>()));
388  }
389 
390  this->receiveInterest(*interest, endpointId);
391 }
392 
393 void
394 GenericLinkService::decodeData(const Block& netPkt, const lp::Packet& firstPkt,
395  const EndpointId& endpointId)
396 {
397  BOOST_ASSERT(netPkt.type() == tlv::Data);
398 
399  // forwarding expects Data to be created with make_shared
400  auto data = make_shared<Data>(netPkt);
401 
402  if (firstPkt.has<lp::NackField>()) {
403  ++this->nInNetInvalid;
404  NFD_LOG_FACE_WARN("received Nack with Data: DROP");
405  return;
406  }
407 
408  if (firstPkt.has<lp::NextHopFaceIdField>()) {
409  ++this->nInNetInvalid;
410  NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
411  return;
412  }
413 
414  if (firstPkt.has<lp::CachePolicyField>()) {
415  // CachePolicy is unprivileged and does not require allowLocalFields option.
416  // In case of an invalid CachePolicyType, get<lp::CachePolicyField> will throw,
417  // so it's unnecessary to check here.
418  data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
419  }
420 
421  if (firstPkt.has<lp::IncomingFaceIdField>()) {
422  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
423  }
424 
425  if (firstPkt.has<lp::CongestionMarkField>()) {
426  data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
427  }
428 
429  if (firstPkt.has<lp::NonDiscoveryField>()) {
430  ++this->nInNetInvalid;
431  NFD_LOG_FACE_WARN("received NonDiscovery with Data: DROP");
432  return;
433  }
434 
435  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
436  if (m_options.allowSelfLearning) {
437  data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
438  }
439  else {
440  NFD_LOG_FACE_WARN("received PrefixAnnouncement, but self-learning disabled: IGNORE");
441  }
442  }
443 
444  this->receiveData(*data, endpointId);
445 }
446 
447 void
448 GenericLinkService::decodeNack(const Block& netPkt, const lp::Packet& firstPkt,
449  const EndpointId& endpointId)
450 {
451  BOOST_ASSERT(netPkt.type() == tlv::Interest);
452  BOOST_ASSERT(firstPkt.has<lp::NackField>());
453 
454  lp::Nack nack((Interest(netPkt)));
455  nack.setHeader(firstPkt.get<lp::NackField>());
456 
457  if (firstPkt.has<lp::NextHopFaceIdField>()) {
458  ++this->nInNetInvalid;
459  NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
460  return;
461  }
462 
463  if (firstPkt.has<lp::CachePolicyField>()) {
464  ++this->nInNetInvalid;
465  NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
466  return;
467  }
468 
469  if (firstPkt.has<lp::IncomingFaceIdField>()) {
470  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
471  }
472 
473  if (firstPkt.has<lp::CongestionMarkField>()) {
474  nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
475  }
476 
477  if (firstPkt.has<lp::NonDiscoveryField>()) {
478  ++this->nInNetInvalid;
479  NFD_LOG_FACE_WARN("received NonDiscovery with Nack: DROP");
480  return;
481  }
482 
483  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
484  ++this->nInNetInvalid;
485  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Nack: DROP");
486  return;
487  }
488 
489  this->receiveNack(nack, endpointId);
490 }
491 
492 } // namespace face
493 } // namespace nfd
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
virtual ssize_t getSendQueueLength()
Definition: transport.hpp:257
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:91
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
std::tuple< bool, Block, lp::Packet > receiveFragment(EndpointId remoteEndpoint, const lp::Packet &packet)
adds received fragment to the buffer
bool isEnabled
enables link-layer reliability
uint64_t EndpointId
Identifies a remote endpoint on the link.
Definition: face-common.hpp:65
void setOptions(const Options &options)
set options for fragmenter
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
signal::Signal< LpReassembler, EndpointId, size_t > beforeTimeout
signals before a partial packet is dropped due to timeout
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
fragments a network-layer packet into link-layer packets
ssize_t getMtu() const
Definition: transport.hpp:442
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
void setOptions(const Options &options)
set options for reliability
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
constexpr size_t CONGESTION_MARK_SIZE
void setOptions(const Options &options)
set options for reassembler