generic-link-service.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "generic-link-service.hpp"
27 
28 #include <ndn-cxx/lp/tags.hpp>
29 
30 #include <cmath>
31 
32 namespace nfd {
33 namespace face {
34 
35 NFD_LOG_INIT(GenericLinkService);
36 
37 constexpr size_t CONGESTION_MARK_SIZE = tlv::sizeOfVarNumber(lp::tlv::CongestionMark) + // type
38  tlv::sizeOfVarNumber(sizeof(uint64_t)) + // length
39  tlv::sizeOfNonNegativeInteger(UINT64_MAX); // value
40 
41 constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR = 2;
42 
44  : m_options(options)
45  , m_fragmenter(m_options.fragmenterOptions, this)
46  , m_reassembler(m_options.reassemblerOptions, this)
47  , m_reliability(m_options.reliabilityOptions, this)
48  , m_lastSeqNo(-2)
49  , m_nextMarkTime(time::steady_clock::TimePoint::max())
50  , m_lastMarkTime(time::steady_clock::TimePoint::min())
51  , m_nMarkedSinceInMarkingState(0)
52 {
53  m_reassembler.beforeTimeout.connect([this] (auto...) { ++this->nReassemblyTimeouts; });
54  m_reliability.onDroppedInterest.connect([this] (const auto& i) { this->notifyDroppedInterest(i); });
55  nReassembling.observe(&m_reassembler);
56 }
57 
58 void
60 {
61  m_options = options;
62  m_fragmenter.setOptions(m_options.fragmenterOptions);
63  m_reassembler.setOptions(m_options.reassemblerOptions);
64  m_reliability.setOptions(m_options.reliabilityOptions);
65 }
66 
67 void
68 GenericLinkService::requestIdlePacket()
69 {
70  // No need to request Acks to attach to this packet from LpReliability, as they are already
71  // attached in sendLpPacket
72  this->sendLpPacket({});
73 }
74 
75 void
76 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
77 {
78  const ssize_t mtu = this->getTransport()->getMtu();
79 
80  if (m_options.reliabilityOptions.isEnabled) {
81  m_reliability.piggyback(pkt, mtu);
82  }
83 
84  if (m_options.allowCongestionMarking) {
85  checkCongestionLevel(pkt);
86  }
87 
88  Transport::Packet tp(pkt.wireEncode());
89  if (mtu != MTU_UNLIMITED && tp.packet.size() > static_cast<size_t>(mtu)) {
90  ++this->nOutOverMtu;
91  NFD_LOG_FACE_WARN("attempted to send packet over MTU limit");
92  return;
93  }
94  this->sendPacket(std::move(tp));
95 }
96 
97 void
98 GenericLinkService::doSendInterest(const Interest& interest)
99 {
100  lp::Packet lpPacket(interest.wireEncode());
101 
102  encodeLpFields(interest, lpPacket);
103 
104  this->sendNetPacket(std::move(lpPacket), true);
105 }
106 
107 void
108 GenericLinkService::doSendData(const Data& data)
109 {
110  lp::Packet lpPacket(data.wireEncode());
111 
112  encodeLpFields(data, lpPacket);
113 
114  this->sendNetPacket(std::move(lpPacket), false);
115 }
116 
117 void
118 GenericLinkService::doSendNack(const lp::Nack& nack)
119 {
120  lp::Packet lpPacket(nack.getInterest().wireEncode());
121  lpPacket.add<lp::NackField>(nack.getHeader());
122 
123  encodeLpFields(nack, lpPacket);
124 
125  this->sendNetPacket(std::move(lpPacket), false);
126 }
127 
128 void
129 GenericLinkService::encodeLpFields(const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
130 {
131  if (m_options.allowLocalFields) {
132  shared_ptr<lp::IncomingFaceIdTag> incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
133  if (incomingFaceIdTag != nullptr) {
134  lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
135  }
136  }
137 
138  shared_ptr<lp::CongestionMarkTag> congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
139  if (congestionMarkTag != nullptr) {
140  lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
141  }
142 
143  if (m_options.allowSelfLearning) {
144  shared_ptr<lp::NonDiscoveryTag> nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
145  if (nonDiscoveryTag != nullptr) {
146  lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
147  }
148 
149  shared_ptr<lp::PrefixAnnouncementTag> prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
150  if (prefixAnnouncementTag != nullptr) {
151  lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
152  }
153  }
154 }
155 
156 void
157 GenericLinkService::sendNetPacket(lp::Packet&& pkt, bool isInterest)
158 {
159  std::vector<lp::Packet> frags;
160  ssize_t mtu = this->getTransport()->getMtu();
161 
162  // Make space for feature fields in fragments
163  if (m_options.reliabilityOptions.isEnabled && mtu != MTU_UNLIMITED) {
165  }
166 
167  if (m_options.allowCongestionMarking && mtu != MTU_UNLIMITED) {
168  mtu -= CONGESTION_MARK_SIZE;
169  }
170 
171  BOOST_ASSERT(mtu == MTU_UNLIMITED || mtu > 0);
172 
173  if (m_options.allowFragmentation && mtu != MTU_UNLIMITED) {
174  bool isOk = false;
175  std::tie(isOk, frags) = m_fragmenter.fragmentPacket(pkt, mtu);
176  if (!isOk) {
177  // fragmentation failed (warning is logged by LpFragmenter)
178  ++this->nFragmentationErrors;
179  return;
180  }
181  }
182  else {
183  if (m_options.reliabilityOptions.isEnabled) {
184  frags.push_back(pkt);
185  }
186  else {
187  frags.push_back(std::move(pkt));
188  }
189  }
190 
191  if (frags.size() == 1) {
192  // even if indexed fragmentation is enabled, the fragmenter should not
193  // fragment the packet if it can fit in MTU
194  BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
195  BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
196  }
197 
198  // Only assign sequences to fragments if packet contains more than 1 fragment
199  if (frags.size() > 1) {
200  // Assign sequences to all fragments
201  this->assignSequences(frags);
202  }
203 
204  if (m_options.reliabilityOptions.isEnabled && frags.front().has<lp::FragmentField>()) {
205  m_reliability.handleOutgoing(frags, std::move(pkt), isInterest);
206  }
207 
208  for (lp::Packet& frag : frags) {
209  this->sendLpPacket(std::move(frag));
210  }
211 }
212 
213 void
214 GenericLinkService::assignSequence(lp::Packet& pkt)
215 {
216  pkt.set<lp::SequenceField>(++m_lastSeqNo);
217 }
218 
219 void
220 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
221 {
222  std::for_each(pkts.begin(), pkts.end(), [this] (auto& pkt) { this->assignSequence(pkt); });
223 }
224 
225 void
226 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
227 {
228  ssize_t sendQueueLength = getTransport()->getSendQueueLength();
229  // This operation requires that the transport supports retrieving current send queue length
230  if (sendQueueLength < 0) {
231  return;
232  }
233 
234  // To avoid overflowing the queue, set the congestion threshold to at least half of the send
235  // queue capacity.
236  size_t congestionThreshold = m_options.defaultCongestionThreshold;
237  if (getTransport()->getSendQueueCapacity() >= 0) {
238  congestionThreshold = std::min(congestionThreshold,
239  static_cast<size_t>(getTransport()->getSendQueueCapacity()) /
240  DEFAULT_CONGESTION_THRESHOLD_DIVISOR);
241  }
242 
243  if (sendQueueLength > 0) {
244  NFD_LOG_FACE_TRACE("txqlen=" << sendQueueLength << " threshold=" << congestionThreshold <<
245  " capacity=" << getTransport()->getSendQueueCapacity());
246  }
247 
248  if (static_cast<size_t>(sendQueueLength) > congestionThreshold) { // Send queue is congested
249  const auto now = time::steady_clock::now();
250  if (now >= m_nextMarkTime || now >= m_lastMarkTime + m_options.baseCongestionMarkingInterval) {
251  // Mark at most one initial packet per baseCongestionMarkingInterval
252  if (m_nMarkedSinceInMarkingState == 0) {
253  m_nextMarkTime = now;
254  }
255 
256  // Time to mark packet
257  pkt.set<lp::CongestionMarkField>(1);
259  NFD_LOG_FACE_DEBUG("LpPacket was marked as congested");
260 
261  ++m_nMarkedSinceInMarkingState;
262  // Decrease the marking interval by the inverse of the square root of the number of packets
263  // marked in this incident of congestion
264  m_nextMarkTime += time::nanoseconds(static_cast<time::nanoseconds::rep>(
265  m_options.baseCongestionMarkingInterval.count() /
266  std::sqrt(m_nMarkedSinceInMarkingState)));
267  m_lastMarkTime = now;
268  }
269  }
270  else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
271  // Congestion incident has ended, so reset
272  NFD_LOG_FACE_DEBUG("Send queue length dropped below congestion threshold");
273  m_nextMarkTime = time::steady_clock::TimePoint::max();
274  m_nMarkedSinceInMarkingState = 0;
275  }
276 }
277 
278 void
279 GenericLinkService::doReceivePacket(Transport::Packet&& packet)
280 {
281  try {
282  lp::Packet pkt(packet.packet);
283 
284  if (m_options.reliabilityOptions.isEnabled) {
285  m_reliability.processIncomingPacket(pkt);
286  }
287 
288  if (!pkt.has<lp::FragmentField>()) {
289  NFD_LOG_FACE_TRACE("received IDLE packet: DROP");
290  return;
291  }
292 
293  if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
294  !m_options.allowReassembly) {
295  NFD_LOG_FACE_WARN("received fragment, but reassembly disabled: DROP");
296  return;
297  }
298 
299  bool isReassembled = false;
300  Block netPkt;
301  lp::Packet firstPkt;
302  std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.receiveFragment(packet.remoteEndpoint,
303  pkt);
304  if (isReassembled) {
305  this->decodeNetPacket(netPkt, firstPkt);
306  }
307  }
308  catch (const tlv::Error& e) {
309  ++this->nInLpInvalid;
310  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
311  }
312 }
313 
314 void
315 GenericLinkService::decodeNetPacket(const Block& netPkt, const lp::Packet& firstPkt)
316 {
317  try {
318  switch (netPkt.type()) {
319  case tlv::Interest:
320  if (firstPkt.has<lp::NackField>()) {
321  this->decodeNack(netPkt, firstPkt);
322  }
323  else {
324  this->decodeInterest(netPkt, firstPkt);
325  }
326  break;
327  case tlv::Data:
328  this->decodeData(netPkt, firstPkt);
329  break;
330  default:
331  ++this->nInNetInvalid;
332  NFD_LOG_FACE_WARN("unrecognized network-layer packet TLV-TYPE " << netPkt.type() << ": DROP");
333  return;
334  }
335  }
336  catch (const tlv::Error& e) {
337  ++this->nInNetInvalid;
338  NFD_LOG_FACE_WARN("packet parse error (" << e.what() << "): DROP");
339  }
340 }
341 
342 void
343 GenericLinkService::decodeInterest(const Block& netPkt, const lp::Packet& firstPkt)
344 {
345  BOOST_ASSERT(netPkt.type() == tlv::Interest);
346  BOOST_ASSERT(!firstPkt.has<lp::NackField>());
347 
348  // forwarding expects Interest to be created with make_shared
349  auto interest = make_shared<Interest>(netPkt);
350 
351  if (firstPkt.has<lp::NextHopFaceIdField>()) {
352  if (m_options.allowLocalFields) {
353  interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
354  }
355  else {
356  NFD_LOG_FACE_WARN("received NextHopFaceId, but local fields disabled: DROP");
357  return;
358  }
359  }
360 
361  if (firstPkt.has<lp::CachePolicyField>()) {
362  ++this->nInNetInvalid;
363  NFD_LOG_FACE_WARN("received CachePolicy with Interest: DROP");
364  return;
365  }
366 
367  if (firstPkt.has<lp::IncomingFaceIdField>()) {
368  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
369  }
370 
371  if (firstPkt.has<lp::CongestionMarkField>()) {
372  interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
373  }
374 
375  if (firstPkt.has<lp::NonDiscoveryField>()) {
376  if (m_options.allowSelfLearning) {
377  interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
378  }
379  else {
380  NFD_LOG_FACE_WARN("received NonDiscovery, but self-learning disabled: IGNORE");
381  }
382  }
383 
384  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
385  ++this->nInNetInvalid;
386  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Interest: DROP");
387  return;
388  }
389 
390  this->receiveInterest(*interest);
391 }
392 
393 void
394 GenericLinkService::decodeData(const Block& netPkt, const lp::Packet& firstPkt)
395 {
396  BOOST_ASSERT(netPkt.type() == tlv::Data);
397 
398  // forwarding expects Data to be created with make_shared
399  auto data = make_shared<Data>(netPkt);
400 
401  if (firstPkt.has<lp::NackField>()) {
402  ++this->nInNetInvalid;
403  NFD_LOG_FACE_WARN("received Nack with Data: DROP");
404  return;
405  }
406 
407  if (firstPkt.has<lp::NextHopFaceIdField>()) {
408  ++this->nInNetInvalid;
409  NFD_LOG_FACE_WARN("received NextHopFaceId with Data: DROP");
410  return;
411  }
412 
413  if (firstPkt.has<lp::CachePolicyField>()) {
414  // CachePolicy is unprivileged and does not require allowLocalFields option.
415  // In case of an invalid CachePolicyType, get<lp::CachePolicyField> will throw,
416  // so it's unnecessary to check here.
417  data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
418  }
419 
420  if (firstPkt.has<lp::IncomingFaceIdField>()) {
421  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
422  }
423 
424  if (firstPkt.has<lp::CongestionMarkField>()) {
425  data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
426  }
427 
428  if (firstPkt.has<lp::NonDiscoveryField>()) {
429  ++this->nInNetInvalid;
430  NFD_LOG_FACE_WARN("received NonDiscovery with Data: DROP");
431  return;
432  }
433 
434  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
435  if (m_options.allowSelfLearning) {
436  data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
437  }
438  else {
439  NFD_LOG_FACE_WARN("received PrefixAnnouncement, but self-learning disabled: IGNORE");
440  }
441  }
442 
443  this->receiveData(*data);
444 }
445 
446 void
447 GenericLinkService::decodeNack(const Block& netPkt, const lp::Packet& firstPkt)
448 {
449  BOOST_ASSERT(netPkt.type() == tlv::Interest);
450  BOOST_ASSERT(firstPkt.has<lp::NackField>());
451 
452  lp::Nack nack((Interest(netPkt)));
453  nack.setHeader(firstPkt.get<lp::NackField>());
454 
455  if (firstPkt.has<lp::NextHopFaceIdField>()) {
456  ++this->nInNetInvalid;
457  NFD_LOG_FACE_WARN("received NextHopFaceId with Nack: DROP");
458  return;
459  }
460 
461  if (firstPkt.has<lp::CachePolicyField>()) {
462  ++this->nInNetInvalid;
463  NFD_LOG_FACE_WARN("received CachePolicy with Nack: DROP");
464  return;
465  }
466 
467  if (firstPkt.has<lp::IncomingFaceIdField>()) {
468  NFD_LOG_FACE_WARN("received IncomingFaceId: IGNORE");
469  }
470 
471  if (firstPkt.has<lp::CongestionMarkField>()) {
472  nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
473  }
474 
475  if (firstPkt.has<lp::NonDiscoveryField>()) {
476  ++this->nInNetInvalid;
477  NFD_LOG_FACE_WARN("received NonDiscovery with Nack: DROP");
478  return;
479  }
480 
481  if (firstPkt.has<lp::PrefixAnnouncementField>()) {
482  ++this->nInNetInvalid;
483  NFD_LOG_FACE_WARN("received PrefixAnnouncement with Nack: DROP");
484  return;
485  }
486 
487  this->receiveNack(nack);
488 }
489 
490 } // namespace face
491 } // namespace nfd
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
Definition: face-log.hpp:79
virtual ssize_t getSendQueueLength()
Definition: transport.hpp:287
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:100
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
Definition: face-log.hpp:82
ssize_t getMtu() const
Definition: transport.hpp:469
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
std::tuple< bool, Block, lp::Packet > receiveFragment(EndpointId remoteEndpoint, const lp::Packet &packet)
adds received fragment to buffer
bool isEnabled
enables link-layer reliability
void setOptions(const Options &options)
set options for fragmenter
stores a packet along with the remote endpoint
Definition: transport.hpp:122
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
Definition: face-log.hpp:88
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TxSequence TLV-LENGTH (1 octet) + sizeof(lp::Sequence) ...
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
signal::Signal< LpReassembler, EndpointId, size_t > beforeTimeout
signals before a partial packet is dropped due to timeout
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
fragments a network-layer packet into link-layer packets
constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
void setOptions(const Options &options)
set options for reliability
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
constexpr size_t CONGESTION_MARK_SIZE
void setOptions(const Options &options)
set options for reassembler