lp-reliability.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "lp-reliability.hpp"
27 #include "generic-link-service.hpp"
28 #include "transport.hpp"
29 #include "common/global.hpp"
30 
31 namespace nfd {
32 namespace face {
33 
35  : m_options(options)
36  , m_linkService(linkService)
37  , m_firstUnackedFrag(m_unackedFrags.begin())
38  , m_lastTxSeqNo(-1) // set to "-1" to start TxSequence numbers at 0
39  , m_isIdleAckTimerRunning(false)
40 {
41  BOOST_ASSERT(m_linkService != nullptr);
42  BOOST_ASSERT(m_options.idleAckTimerPeriod > 0_ns);
43 }
44 
45 void
47 {
48  BOOST_ASSERT(options.idleAckTimerPeriod > 0_ns);
49 
50  if (m_options.isEnabled && !options.isEnabled) {
51  this->stopIdleAckTimer();
52  }
53 
54  m_options = options;
55 }
56 
57 const GenericLinkService*
59 {
60  return m_linkService;
61 }
62 
63 void
64 LpReliability::handleOutgoing(std::vector<lp::Packet>& frags, lp::Packet&& pkt, bool isInterest)
65 {
66  BOOST_ASSERT(m_options.isEnabled);
67 
68  auto unackedFragsIt = m_unackedFrags.begin();
69  auto sendTime = time::steady_clock::now();
70 
71  auto netPkt = make_shared<NetPkt>(std::move(pkt), isInterest);
72  netPkt->unackedFrags.reserve(frags.size());
73 
74  for (lp::Packet& frag : frags) {
75  // Assign TxSequence number
76  lp::Sequence txSeq = assignTxSequence(frag);
77 
78  // Store LpPacket for future retransmissions
79  unackedFragsIt = m_unackedFrags.emplace_hint(unackedFragsIt,
80  std::piecewise_construct,
81  std::forward_as_tuple(txSeq),
82  std::forward_as_tuple(frag));
83  unackedFragsIt->second.sendTime = sendTime;
84  unackedFragsIt->second.rtoTimer = getScheduler().schedule(m_rto.computeRto(),
85  [=] { onLpPacketLost(txSeq); });
86  unackedFragsIt->second.netPkt = netPkt;
87 
88  if (m_unackedFrags.size() == 1) {
89  m_firstUnackedFrag = m_unackedFrags.begin();
90  }
91 
92  // Add to associated NetPkt
93  netPkt->unackedFrags.push_back(unackedFragsIt);
94  }
95 }
96 
97 void
99 {
100  BOOST_ASSERT(m_options.isEnabled);
101 
102  auto now = time::steady_clock::now();
103 
104  // Extract and parse Acks
105  for (lp::Sequence ackSeq : pkt.list<lp::AckField>()) {
106  auto fragIt = m_unackedFrags.find(ackSeq);
107  if (fragIt == m_unackedFrags.end()) {
108  // Ignore an Ack for an unknown TxSequence number
109  continue;
110  }
111  auto& frag = fragIt->second;
112 
113  // Cancel the RTO timer for the acknowledged fragment
114  frag.rtoTimer.cancel();
115 
116  if (frag.retxCount == 0) {
117  // This sequence had no retransmissions, so use it to calculate the RTO
118  m_rto.addMeasurement(time::duration_cast<RttEstimator::Duration>(now - frag.sendTime));
119  }
120 
121  // Look for frags with TxSequence numbers < ackSeq (allowing for wraparound) and consider them
122  // lost if a configurable number of Acks containing greater TxSequence numbers have been
123  // received.
124  auto lostLpPackets = findLostLpPackets(fragIt);
125 
126  // Remove the fragment from the map of unacknowledged fragments and from its associated network
127  // packet. Potentially increment the start of the window.
128  onLpPacketAcknowledged(fragIt);
129 
130  // This set contains TxSequences that have been removed by onLpPacketLost below because they
131  // were part of a network packet that was removed due to a fragment exceeding retx, as well as
132  // any other TxSequences removed by onLpPacketLost. This prevents onLpPacketLost from being
133  // called later for an invalid iterator.
134  std::set<lp::Sequence> removedLpPackets;
135 
136  // Resend or fail fragments considered lost. Potentially increment the start of the window.
137  for (lp::Sequence txSeq : lostLpPackets) {
138  if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
139  auto removedThisTxSeq = this->onLpPacketLost(txSeq);
140  for (auto removedTxSeq : removedThisTxSeq) {
141  removedLpPackets.insert(removedTxSeq);
142  }
143  }
144  }
145  }
146 
147  // If packet has Fragment and TxSequence fields, extract TxSequence and add to AckQueue
148  if (pkt.has<lp::FragmentField>() && pkt.has<lp::TxSequenceField>()) {
149  m_ackQueue.push(pkt.get<lp::TxSequenceField>());
150  if (!m_isIdleAckTimerRunning) {
151  this->startIdleAckTimer();
152  }
153  }
154 }
155 
156 void
157 LpReliability::piggyback(lp::Packet& pkt, ssize_t mtu)
158 {
159  BOOST_ASSERT(m_options.isEnabled);
160  BOOST_ASSERT(pkt.wireEncode().type() == lp::tlv::LpPacket);
161 
162  // up to 2 extra octets reserved for potential TLV-LENGTH size increases
163  ssize_t pktSize = pkt.wireEncode().size();
164  ssize_t reservedSpace = tlv::sizeOfVarNumber(ndn::MAX_NDN_PACKET_SIZE) -
165  tlv::sizeOfVarNumber(pktSize);
166  ssize_t remainingSpace = (mtu == MTU_UNLIMITED ? ndn::MAX_NDN_PACKET_SIZE : mtu) - reservedSpace;
167  remainingSpace -= pktSize;
168 
169  while (!m_ackQueue.empty()) {
170  lp::Sequence ackSeq = m_ackQueue.front();
171  // Ack size = Ack TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + uint64_t (8 octets)
172  const ssize_t ackSize = tlv::sizeOfVarNumber(lp::tlv::Ack) +
173  tlv::sizeOfVarNumber(sizeof(lp::Sequence)) +
174  sizeof(lp::Sequence);
175 
176  if (ackSize > remainingSpace) {
177  break;
178  }
179 
180  pkt.add<lp::AckField>(ackSeq);
181  m_ackQueue.pop();
182  remainingSpace -= ackSize;
183  }
184 }
185 
186 lp::Sequence
187 LpReliability::assignTxSequence(lp::Packet& frag)
188 {
189  lp::Sequence txSeq = ++m_lastTxSeqNo;
190  frag.set<lp::TxSequenceField>(txSeq);
191  if (m_unackedFrags.size() > 0 && m_lastTxSeqNo == m_firstUnackedFrag->first) {
192  NDN_THROW(std::length_error("TxSequence range exceeded"));
193  }
194  return m_lastTxSeqNo;
195 }
196 
197 void
198 LpReliability::startIdleAckTimer()
199 {
200  BOOST_ASSERT(!m_isIdleAckTimerRunning);
201  m_isIdleAckTimerRunning = true;
202 
203  m_idleAckTimer = getScheduler().schedule(m_options.idleAckTimerPeriod, [this] {
204  while (!m_ackQueue.empty()) {
205  m_linkService->requestIdlePacket();
206  }
207 
208  m_isIdleAckTimerRunning = false;
209  });
210 }
211 
212 void
213 LpReliability::stopIdleAckTimer()
214 {
215  m_idleAckTimer.cancel();
216  m_isIdleAckTimerRunning = false;
217 }
218 
219 std::vector<lp::Sequence>
220 LpReliability::findLostLpPackets(LpReliability::UnackedFrags::iterator ackIt)
221 {
222  std::vector<lp::Sequence> lostLpPackets;
223 
224  for (auto it = m_firstUnackedFrag; ; ++it) {
225  if (it == m_unackedFrags.end()) {
226  it = m_unackedFrags.begin();
227  }
228 
229  if (it->first == ackIt->first) {
230  break;
231  }
232 
233  auto& unackedFrag = it->second;
234  unackedFrag.nGreaterSeqAcks++;
235 
236  if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
237  lostLpPackets.push_back(it->first);
238  }
239  }
240 
241  return lostLpPackets;
242 }
243 
244 std::vector<lp::Sequence>
245 LpReliability::onLpPacketLost(lp::Sequence txSeq)
246 {
247  BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
248  auto txSeqIt = m_unackedFrags.find(txSeq);
249 
250  auto& txFrag = txSeqIt->second;
251  txFrag.rtoTimer.cancel();
252  auto netPkt = txFrag.netPkt;
253  std::vector<lp::Sequence> removedThisTxSeq;
254 
255  // Check if maximum number of retransmissions exceeded
256  if (txFrag.retxCount >= m_options.maxRetx) {
257  // Delete all LpPackets of NetPkt from m_unackedFrags (except this one)
258  for (size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
259  if (netPkt->unackedFrags[i] != txSeqIt) {
260  removedThisTxSeq.push_back(netPkt->unackedFrags[i]->first);
261  deleteUnackedFrag(netPkt->unackedFrags[i]);
262  }
263  }
264 
265  ++m_linkService->nRetxExhausted;
266 
267  // Notify strategy of dropped Interest (if any)
268  if (netPkt->isInterest) {
269  BOOST_ASSERT(netPkt->pkt.has<lp::FragmentField>());
270  ndn::Buffer::const_iterator fragBegin, fragEnd;
271  std::tie(fragBegin, fragEnd) = netPkt->pkt.get<lp::FragmentField>();
272  Block frag(&*fragBegin, std::distance(fragBegin, fragEnd));
273  onDroppedInterest(Interest(frag));
274  }
275 
276  removedThisTxSeq.push_back(txSeqIt->first);
277  deleteUnackedFrag(txSeqIt);
278  }
279  else {
280  // Assign new TxSequence
281  lp::Sequence newTxSeq = assignTxSequence(txFrag.pkt);
282  netPkt->didRetx = true;
283 
284  // Move fragment to new TxSequence mapping
285  auto newTxFragIt = m_unackedFrags.emplace_hint(
286  m_firstUnackedFrag != m_unackedFrags.end() && m_firstUnackedFrag->first > newTxSeq
287  ? m_firstUnackedFrag
288  : m_unackedFrags.end(),
289  std::piecewise_construct,
290  std::forward_as_tuple(newTxSeq),
291  std::forward_as_tuple(txFrag.pkt));
292  auto& newTxFrag = newTxFragIt->second;
293  newTxFrag.retxCount = txFrag.retxCount + 1;
294  newTxFrag.netPkt = netPkt;
295 
296  // Update associated NetPkt
297  auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), txSeqIt);
298  BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
299  *fragInNetPkt = newTxFragIt;
300 
301  removedThisTxSeq.push_back(txSeqIt->first);
302  deleteUnackedFrag(txSeqIt);
303 
304  // Retransmit fragment
305  m_linkService->sendLpPacket(lp::Packet(newTxFrag.pkt));
306 
307  // Start RTO timer for this sequence
308  newTxFrag.rtoTimer = getScheduler().schedule(m_rto.computeRto(), [=] { onLpPacketLost(newTxSeq); });
309  }
310 
311  return removedThisTxSeq;
312 }
313 
314 void
315 LpReliability::onLpPacketAcknowledged(UnackedFrags::iterator fragIt)
316 {
317  auto netPkt = fragIt->second.netPkt;
318 
319  // Remove from NetPkt unacked fragment list
320  auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), fragIt);
321  BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
322  *fragInNetPkt = netPkt->unackedFrags.back();
323  netPkt->unackedFrags.pop_back();
324 
325  // Check if network-layer packet completely received. If so, increment counters
326  if (netPkt->unackedFrags.empty()) {
327  if (netPkt->didRetx) {
328  ++m_linkService->nRetransmitted;
329  }
330  else {
331  ++m_linkService->nAcknowledged;
332  }
333  }
334 
335  deleteUnackedFrag(fragIt);
336 }
337 
338 void
339 LpReliability::deleteUnackedFrag(UnackedFrags::iterator fragIt)
340 {
341  lp::Sequence firstUnackedTxSeq = m_firstUnackedFrag->first;
342  lp::Sequence currentTxSeq = fragIt->first;
343  auto nextFragIt = m_unackedFrags.erase(fragIt);
344 
345  if (!m_unackedFrags.empty() && firstUnackedTxSeq == currentTxSeq) {
346  // If "first" fragment in send window (allowing for wraparound), increment window begin
347  if (nextFragIt == m_unackedFrags.end()) {
348  m_firstUnackedFrag = m_unackedFrags.begin();
349  }
350  else {
351  m_firstUnackedFrag = nextFragIt;
352  }
353  }
354  else if (m_unackedFrags.empty()) {
355  m_firstUnackedFrag = m_unackedFrags.end();
356  }
357 }
358 
359 LpReliability::UnackedFrag::UnackedFrag(lp::Packet pkt)
360  : pkt(std::move(pkt))
361  , sendTime(time::steady_clock::now())
362  , retxCount(0)
363  , nGreaterSeqAcks(0)
364 {
365 }
366 
367 LpReliability::NetPkt::NetPkt(lp::Packet&& pkt, bool isInterest)
368  : pkt(std::move(pkt))
369  , isInterest(isInterest)
370  , didRetx(false)
371 {
372 }
373 
374 } // namespace face
375 } // namespace nfd
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
const GenericLinkService * getLinkService() const
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:100
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
bool isEnabled
enables link-layer reliability
size_t maxRetx
maximum number of retransmissions for an LpPacket
Scheduler & getScheduler()
Returns the global Scheduler instance for the calling thread.
Definition: global.cpp:45
Table::const_iterator iterator
Definition: cs-internal.hpp:41
size_t seqNumLossThreshold
a fragment is considered lost if this number of fragments with greater sequence numbers are acknowled...
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
LpReliability(const Options &options, GenericLinkService *linkService)
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
time::nanoseconds idleAckTimerPeriod
period between sending pending Acks in an IDLE packet
void setOptions(const Options &options)
set options for reliability
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
Duration computeRto() const
void addMeasurement(Duration measure)