lp-reliability.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2018, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "lp-reliability.hpp"
27 #include "generic-link-service.hpp"
28 #include "transport.hpp"
29 
30 namespace nfd {
31 namespace face {
32 
34  : m_options(options)
35  , m_linkService(linkService)
36  , m_firstUnackedFrag(m_unackedFrags.begin())
37  , m_lastTxSeqNo(-1) // set to "-1" to start TxSequence numbers at 0
38  , m_isIdleAckTimerRunning(false)
39 {
40  BOOST_ASSERT(m_linkService != nullptr);
41 
42  BOOST_ASSERT(m_options.idleAckTimerPeriod > 0_ns);
43 }
44 
45 void
47 {
48  BOOST_ASSERT(options.idleAckTimerPeriod > 0_ns);
49 
50  if (m_options.isEnabled && !options.isEnabled) {
51  this->stopIdleAckTimer();
52  }
53 
54  m_options = options;
55 }
56 
57 const GenericLinkService*
59 {
60  return m_linkService;
61 }
62 
63 void
64 LpReliability::handleOutgoing(std::vector<lp::Packet>& frags, lp::Packet&& pkt, bool isInterest)
65 {
66  BOOST_ASSERT(m_options.isEnabled);
67 
68  auto unackedFragsIt = m_unackedFrags.begin();
69  auto sendTime = time::steady_clock::now();
70 
71  auto netPkt = make_shared<NetPkt>(std::move(pkt), isInterest);
72  netPkt->unackedFrags.reserve(frags.size());
73 
74  for (lp::Packet& frag : frags) {
75  // Assign TxSequence number
76  lp::Sequence txSeq = assignTxSequence(frag);
77 
78  // Store LpPacket for future retransmissions
79  unackedFragsIt = m_unackedFrags.emplace_hint(unackedFragsIt,
80  std::piecewise_construct,
81  std::forward_as_tuple(txSeq),
82  std::forward_as_tuple(frag));
83  unackedFragsIt->second.sendTime = sendTime;
84  unackedFragsIt->second.rtoTimer = scheduler::schedule(m_rto.computeRto(), [=] { onLpPacketLost(txSeq); });
85  unackedFragsIt->second.netPkt = netPkt;
86 
87  if (m_unackedFrags.size() == 1) {
88  m_firstUnackedFrag = m_unackedFrags.begin();
89  }
90 
91  // Add to associated NetPkt
92  netPkt->unackedFrags.push_back(unackedFragsIt);
93  }
94 }
95 
96 void
98 {
99  BOOST_ASSERT(m_options.isEnabled);
100 
101  auto now = time::steady_clock::now();
102 
103  // Extract and parse Acks
104  for (lp::Sequence ackSeq : pkt.list<lp::AckField>()) {
105  auto fragIt = m_unackedFrags.find(ackSeq);
106  if (fragIt == m_unackedFrags.end()) {
107  // Ignore an Ack for an unknown TxSequence number
108  continue;
109  }
110  auto& frag = fragIt->second;
111 
112  // Cancel the RTO timer for the acknowledged fragment
113  frag.rtoTimer.cancel();
114 
115  if (frag.retxCount == 0) {
116  // This sequence had no retransmissions, so use it to calculate the RTO
117  m_rto.addMeasurement(time::duration_cast<RttEstimator::Duration>(now - frag.sendTime));
118  }
119 
120  // Look for frags with TxSequence numbers < ackSeq (allowing for wraparound) and consider them
121  // lost if a configurable number of Acks containing greater TxSequence numbers have been
122  // received.
123  auto lostLpPackets = findLostLpPackets(fragIt);
124 
125  // Remove the fragment from the map of unacknowledged fragments and from its associated network
126  // packet. Potentially increment the start of the window.
127  onLpPacketAcknowledged(fragIt);
128 
129  // This set contains TxSequences that have been removed by onLpPacketLost below because they
130  // were part of a network packet that was removed due to a fragment exceeding retx, as well as
131  // any other TxSequences removed by onLpPacketLost. This prevents onLpPacketLost from being
132  // called later for an invalid iterator.
133  std::set<lp::Sequence> removedLpPackets;
134 
135  // Resend or fail fragments considered lost. Potentially increment the start of the window.
136  for (lp::Sequence txSeq : lostLpPackets) {
137  if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
138  auto removedThisTxSeq = this->onLpPacketLost(txSeq);
139  for (auto removedTxSeq : removedThisTxSeq) {
140  removedLpPackets.insert(removedTxSeq);
141  }
142  }
143  }
144  }
145 
146  // If packet has Fragment and TxSequence fields, extract TxSequence and add to AckQueue
147  if (pkt.has<lp::FragmentField>() && pkt.has<lp::TxSequenceField>()) {
148  m_ackQueue.push(pkt.get<lp::TxSequenceField>());
149  if (!m_isIdleAckTimerRunning) {
150  this->startIdleAckTimer();
151  }
152  }
153 }
154 
155 void
156 LpReliability::piggyback(lp::Packet& pkt, ssize_t mtu)
157 {
158  BOOST_ASSERT(m_options.isEnabled);
159  BOOST_ASSERT(pkt.wireEncode().type() == lp::tlv::LpPacket);
160 
161  // up to 2 extra octets reserved for potential TLV-LENGTH size increases
162  ssize_t pktSize = pkt.wireEncode().size();
163  ssize_t reservedSpace = tlv::sizeOfVarNumber(ndn::MAX_NDN_PACKET_SIZE) -
164  tlv::sizeOfVarNumber(pktSize);
165  ssize_t remainingSpace = (mtu == MTU_UNLIMITED ? ndn::MAX_NDN_PACKET_SIZE : mtu) - reservedSpace;
166  remainingSpace -= pktSize;
167 
168  while (!m_ackQueue.empty()) {
169  lp::Sequence ackSeq = m_ackQueue.front();
170  // Ack size = Ack TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + uint64_t (8 octets)
171  const ssize_t ackSize = tlv::sizeOfVarNumber(lp::tlv::Ack) +
172  tlv::sizeOfVarNumber(sizeof(lp::Sequence)) +
173  sizeof(lp::Sequence);
174 
175  if (ackSize > remainingSpace) {
176  break;
177  }
178 
179  pkt.add<lp::AckField>(ackSeq);
180  m_ackQueue.pop();
181  remainingSpace -= ackSize;
182  }
183 }
184 
185 lp::Sequence
186 LpReliability::assignTxSequence(lp::Packet& frag)
187 {
188  lp::Sequence txSeq = ++m_lastTxSeqNo;
189  frag.set<lp::TxSequenceField>(txSeq);
190  if (m_unackedFrags.size() > 0 && m_lastTxSeqNo == m_firstUnackedFrag->first) {
191  BOOST_THROW_EXCEPTION(std::length_error("TxSequence range exceeded"));
192  }
193  return m_lastTxSeqNo;
194 }
195 
196 void
197 LpReliability::startIdleAckTimer()
198 {
199  BOOST_ASSERT(!m_isIdleAckTimerRunning);
200  m_isIdleAckTimerRunning = true;
201 
202  m_idleAckTimer = scheduler::schedule(m_options.idleAckTimerPeriod, [this] {
203  while (!m_ackQueue.empty()) {
204  m_linkService->requestIdlePacket();
205  }
206 
207  m_isIdleAckTimerRunning = false;
208  });
209 }
210 
211 void
212 LpReliability::stopIdleAckTimer()
213 {
214  m_idleAckTimer.cancel();
215  m_isIdleAckTimerRunning = false;
216 }
217 
218 std::vector<lp::Sequence>
219 LpReliability::findLostLpPackets(LpReliability::UnackedFrags::iterator ackIt)
220 {
221  std::vector<lp::Sequence> lostLpPackets;
222 
223  for (auto it = m_firstUnackedFrag; ; ++it) {
224  if (it == m_unackedFrags.end()) {
225  it = m_unackedFrags.begin();
226  }
227 
228  if (it->first == ackIt->first) {
229  break;
230  }
231 
232  auto& unackedFrag = it->second;
233  unackedFrag.nGreaterSeqAcks++;
234 
235  if (unackedFrag.nGreaterSeqAcks >= m_options.seqNumLossThreshold) {
236  lostLpPackets.push_back(it->first);
237  }
238  }
239 
240  return lostLpPackets;
241 }
242 
243 std::vector<lp::Sequence>
244 LpReliability::onLpPacketLost(lp::Sequence txSeq)
245 {
246  BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
247  auto txSeqIt = m_unackedFrags.find(txSeq);
248 
249  auto& txFrag = txSeqIt->second;
250  txFrag.rtoTimer.cancel();
251  auto netPkt = txFrag.netPkt;
252  std::vector<lp::Sequence> removedThisTxSeq;
253 
254  // Check if maximum number of retransmissions exceeded
255  if (txFrag.retxCount >= m_options.maxRetx) {
256  // Delete all LpPackets of NetPkt from m_unackedFrags (except this one)
257  for (size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
258  if (netPkt->unackedFrags[i] != txSeqIt) {
259  removedThisTxSeq.push_back(netPkt->unackedFrags[i]->first);
260  deleteUnackedFrag(netPkt->unackedFrags[i]);
261  }
262  }
263 
264  ++m_linkService->nRetxExhausted;
265 
266  // Notify strategy of dropped Interest (if any)
267  if (netPkt->isInterest) {
268  BOOST_ASSERT(netPkt->pkt.has<lp::FragmentField>());
269  ndn::Buffer::const_iterator fragBegin, fragEnd;
270  std::tie(fragBegin, fragEnd) = netPkt->pkt.get<lp::FragmentField>();
271  Block frag(&*fragBegin, std::distance(fragBegin, fragEnd));
272  onDroppedInterest(Interest(frag));
273  }
274 
275  removedThisTxSeq.push_back(txSeqIt->first);
276  deleteUnackedFrag(txSeqIt);
277  }
278  else {
279  // Assign new TxSequence
280  lp::Sequence newTxSeq = assignTxSequence(txFrag.pkt);
281  netPkt->didRetx = true;
282 
283  // Move fragment to new TxSequence mapping
284  auto newTxFragIt = m_unackedFrags.emplace_hint(
285  m_firstUnackedFrag != m_unackedFrags.end() && m_firstUnackedFrag->first > newTxSeq
286  ? m_firstUnackedFrag
287  : m_unackedFrags.end(),
288  std::piecewise_construct,
289  std::forward_as_tuple(newTxSeq),
290  std::forward_as_tuple(txFrag.pkt));
291  auto& newTxFrag = newTxFragIt->second;
292  newTxFrag.retxCount = txFrag.retxCount + 1;
293  newTxFrag.netPkt = netPkt;
294 
295  // Update associated NetPkt
296  auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), txSeqIt);
297  BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
298  *fragInNetPkt = newTxFragIt;
299 
300  removedThisTxSeq.push_back(txSeqIt->first);
301  deleteUnackedFrag(txSeqIt);
302 
303  // Retransmit fragment
304  m_linkService->sendLpPacket(lp::Packet(newTxFrag.pkt));
305 
306  // Start RTO timer for this sequence
307  newTxFrag.rtoTimer = scheduler::schedule(m_rto.computeRto(), [=] { onLpPacketLost(newTxSeq); });
308  }
309 
310  return removedThisTxSeq;
311 }
312 
313 void
314 LpReliability::onLpPacketAcknowledged(UnackedFrags::iterator fragIt)
315 {
316  auto netPkt = fragIt->second.netPkt;
317 
318  // Remove from NetPkt unacked fragment list
319  auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), fragIt);
320  BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
321  *fragInNetPkt = netPkt->unackedFrags.back();
322  netPkt->unackedFrags.pop_back();
323 
324  // Check if network-layer packet completely received. If so, increment counters
325  if (netPkt->unackedFrags.empty()) {
326  if (netPkt->didRetx) {
327  ++m_linkService->nRetransmitted;
328  }
329  else {
330  ++m_linkService->nAcknowledged;
331  }
332  }
333 
334  deleteUnackedFrag(fragIt);
335 }
336 
337 void
338 LpReliability::deleteUnackedFrag(UnackedFrags::iterator fragIt)
339 {
340  lp::Sequence firstUnackedTxSeq = m_firstUnackedFrag->first;
341  lp::Sequence currentTxSeq = fragIt->first;
342  auto nextFragIt = m_unackedFrags.erase(fragIt);
343 
344  if (!m_unackedFrags.empty() && firstUnackedTxSeq == currentTxSeq) {
345  // If "first" fragment in send window (allowing for wraparound), increment window begin
346  if (nextFragIt == m_unackedFrags.end()) {
347  m_firstUnackedFrag = m_unackedFrags.begin();
348  }
349  else {
350  m_firstUnackedFrag = nextFragIt;
351  }
352  }
353  else if (m_unackedFrags.empty()) {
354  m_firstUnackedFrag = m_unackedFrags.end();
355  }
356 }
357 
358 LpReliability::UnackedFrag::UnackedFrag(lp::Packet pkt)
359  : pkt(std::move(pkt))
360  , sendTime(time::steady_clock::now())
361  , retxCount(0)
362  , nGreaterSeqAcks(0)
363 {
364 }
365 
366 LpReliability::NetPkt::NetPkt(lp::Packet&& pkt, bool isInterest)
367  : pkt(std::move(pkt))
368  , isInterest(isInterest)
369  , didRetx(false)
370 {
371 }
372 
373 } // namespace face
374 } // namespace nfd
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
const GenericLinkService * getLinkService() const
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
Definition: transport.hpp:96
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
bool isEnabled
enables link-layer reliability
size_t maxRetx
maximum number of retransmissions for an LpPacket
Table::const_iterator iterator
Definition: cs-internal.hpp:41
size_t seqNumLossThreshold
a fragment is considered lost if this number of fragments with greater sequence numbers are acknowled...
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
LpReliability(const Options &options, GenericLinkService *linkService)
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
time::nanoseconds idleAckTimerPeriod
period between sending pending Acks in an IDLE packet
void setOptions(const Options &options)
set options for reliability
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
EventId schedule(time::nanoseconds after, const EventCallback &event)
Schedule an event.
Definition: scheduler.cpp:47
Duration computeRto() const
void addMeasurement(Duration measure)