asf-strategy.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "asf-strategy.hpp"
27 #include "algorithm.hpp"
28 #include "common/global.hpp"
29 #include "common/logger.hpp"
30 
31 namespace nfd::fw::asf {
32 
33 NFD_LOG_INIT(AsfStrategy);
35 
36 AsfStrategy::AsfStrategy(Forwarder& forwarder, const Name& name)
37  : Strategy(forwarder)
38  , m_measurements(getMeasurements())
39  , m_probing(m_measurements)
40 {
42  if (parsed.version && *parsed.version != getStrategyName()[-1].toVersion()) {
43  NDN_THROW(std::invalid_argument(
44  "AsfStrategy does not support version " + to_string(*parsed.version)));
45  }
46 
48  m_retxSuppression = RetxSuppressionExponential::construct(params);
49  auto probingInterval = params.getOrDefault<time::milliseconds::rep>("probing-interval",
50  m_probing.getProbingInterval().count());
51  m_probing.setProbingInterval(time::milliseconds(probingInterval));
52  m_nMaxTimeouts = params.getOrDefault<size_t>("max-timeouts", m_nMaxTimeouts);
53 
55 
56  NDN_LOG_DEBUG(*m_retxSuppression);
57  NFD_LOG_DEBUG("probing-interval=" << m_probing.getProbingInterval()
58  << " max-timeouts=" << m_nMaxTimeouts);
59 }
60 
61 const Name&
63 {
64  static const auto strategyName = Name("/localhost/nfd/strategy/asf").appendVersion(4);
65  return strategyName;
66 }
67 
68 void
69 AsfStrategy::afterReceiveInterest(const Interest& interest, const FaceEndpoint& ingress,
70  const shared_ptr<pit::Entry>& pitEntry)
71 {
72  const auto& fibEntry = this->lookupFib(*pitEntry);
73 
74  // Check if the interest is new and, if so, skip the retx suppression check
75  if (!hasPendingOutRecords(*pitEntry)) {
76  auto* faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry);
77  if (faceToUse == nullptr) {
78  NFD_LOG_DEBUG(interest << " new-interest from=" << ingress << " no-nexthop");
79  sendNoRouteNack(ingress.face, pitEntry);
80  }
81  else {
82  NFD_LOG_DEBUG(interest << " new-interest from=" << ingress << " forward-to=" << faceToUse->getId());
83  forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
84  sendProbe(interest, ingress, *faceToUse, fibEntry, pitEntry);
85  }
86  return;
87  }
88 
89  auto* faceToUse = getBestFaceForForwarding(interest, ingress.face, fibEntry, pitEntry, false);
90  if (faceToUse != nullptr) {
91  auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, *faceToUse);
92  if (suppressResult == RetxSuppressionResult::SUPPRESS) {
93  // Cannot be sent on this face, interest was received within the suppression window
94  NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress
95  << " forward-to=" << faceToUse->getId() << " suppressed");
96  }
97  else {
98  // The retx arrived after the suppression period: forward it but don't probe, because
99  // probing was done earlier for this interest when it was newly received
100  NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress << " forward-to=" << faceToUse->getId());
101  auto* outRecord = forwardInterest(interest, *faceToUse, fibEntry, pitEntry);
102  if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
103  m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
104  }
105  }
106  return;
107  }
108 
109  // If all eligible faces have been used (i.e., they all have a pending out-record),
110  // choose the nexthop with the earliest out-record
111  const auto& nexthops = fibEntry.getNextHops();
112  auto it = findEligibleNextHopWithEarliestOutRecord(ingress.face, interest, nexthops, pitEntry);
113  if (it == nexthops.end()) {
114  NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress << " no eligible nexthop");
115  return;
116  }
117  auto& outFace = it->getFace();
118  auto suppressResult = m_retxSuppression->decidePerUpstream(*pitEntry, outFace);
119  if (suppressResult == RetxSuppressionResult::SUPPRESS) {
120  NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress
121  << " retry-to=" << outFace.getId() << " suppressed");
122  }
123  else {
124  NFD_LOG_DEBUG(interest << " retx-interest from=" << ingress << " retry-to=" << outFace.getId());
125  // sendInterest() is used here instead of forwardInterest() because the measurements info
126  // were already attached to this face in the previous forwarding
127  auto* outRecord = sendInterest(interest, outFace, pitEntry);
128  if (outRecord && suppressResult == RetxSuppressionResult::FORWARD) {
129  m_retxSuppression->incrementIntervalForOutRecord(*outRecord);
130  }
131  }
132 }
133 
134 void
135 AsfStrategy::beforeSatisfyInterest(const Data& data, const FaceEndpoint& ingress,
136  const shared_ptr<pit::Entry>& pitEntry)
137 {
138  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(pitEntry->getName());
139  if (namespaceInfo == nullptr) {
140  NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress << " no-measurements");
141  return;
142  }
143 
144  // Record the RTT between the Interest out to Data in
145  FaceInfo* faceInfo = namespaceInfo->getFaceInfo(ingress.face.getId());
146  if (faceInfo == nullptr) {
147  NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress << " no-face-info");
148  return;
149  }
150 
151  auto outRecord = pitEntry->getOutRecord(ingress.face);
152  if (outRecord == pitEntry->out_end()) {
153  NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress << " no-out-record");
154  }
155  else {
156  faceInfo->recordRtt(time::steady_clock::now() - outRecord->getLastRenewed());
157  NFD_LOG_DEBUG(pitEntry->getName() << " data from=" << ingress
158  << " rtt=" << faceInfo->getLastRtt() << " srtt=" << faceInfo->getSrtt());
159  }
160 
161  // Extend lifetime for measurements associated with Face
162  namespaceInfo->extendFaceInfoLifetime(*faceInfo, ingress.face.getId());
163  // Extend PIT entry timer to allow slower probes to arrive
164  this->setExpiryTimer(pitEntry, 50_ms);
165  faceInfo->cancelTimeout(data.getName());
166 }
167 
168 void
169 AsfStrategy::afterReceiveNack(const lp::Nack& nack, const FaceEndpoint& ingress,
170  const shared_ptr<pit::Entry>& pitEntry)
171 {
172  NFD_LOG_DEBUG(nack.getInterest() << " nack from=" << ingress << " reason=" << nack.getReason());
173  onTimeoutOrNack(pitEntry->getName(), ingress.face.getId(), true);
174 }
175 
177 AsfStrategy::forwardInterest(const Interest& interest, Face& outFace, const fib::Entry& fibEntry,
178  const shared_ptr<pit::Entry>& pitEntry)
179 {
180  const auto& interestName = interest.getName();
181  auto faceId = outFace.getId();
182 
183  auto* outRecord = sendInterest(interest, outFace, pitEntry);
184 
185  FaceInfo& faceInfo = m_measurements.getOrCreateFaceInfo(fibEntry, interestName, faceId);
186 
187  // Refresh measurements since Face is being used for forwarding
188  NamespaceInfo& namespaceInfo = m_measurements.getOrCreateNamespaceInfo(fibEntry, interestName);
189  namespaceInfo.extendFaceInfoLifetime(faceInfo, faceId);
190 
191  if (!faceInfo.isTimeoutScheduled()) {
192  auto timeout = faceInfo.scheduleTimeout(interestName,
193  [this, name = interestName, faceId] {
194  onTimeoutOrNack(name, faceId, false);
195  });
196  NFD_LOG_TRACE("Scheduled timeout for " << fibEntry.getPrefix() << " to=" << faceId
197  << " in " << time::duration_cast<time::milliseconds>(timeout));
198  }
199 
200  return outRecord;
201 }
202 
203 void
204 AsfStrategy::sendProbe(const Interest& interest, const FaceEndpoint& ingress, const Face& faceToUse,
205  const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry)
206 {
207  if (!m_probing.isProbingNeeded(fibEntry, interest.getName()))
208  return;
209 
210  Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
211  if (faceToProbe == nullptr)
212  return;
213 
214  Interest probeInterest(interest);
215  probeInterest.refreshNonce();
216  NFD_LOG_TRACE("Sending probe " << probeInterest << " to=" << faceToProbe->getId());
217  forwardInterest(probeInterest, *faceToProbe, fibEntry, pitEntry);
218 
219  m_probing.afterForwardingProbe(fibEntry, interest.getName());
220 }
221 
222 struct FaceStats
223 {
224  Face* face;
225  time::nanoseconds rtt;
226  time::nanoseconds srtt;
227  uint64_t cost;
228 };
229 
230 struct FaceStatsCompare
231 {
232  bool
233  operator()(const FaceStats& lhs, const FaceStats& rhs) const
234  {
235  time::nanoseconds lhsValue = getValueForSorting(lhs);
236  time::nanoseconds rhsValue = getValueForSorting(rhs);
237 
238  // Sort by RTT and then by cost
239  return std::tie(lhsValue, lhs.cost) < std::tie(rhsValue, rhs.cost);
240  }
241 
242 private:
243  static time::nanoseconds
244  getValueForSorting(const FaceStats& stats)
245  {
246  // These values allow faces with no measurements to be ranked better than timeouts
247  // srtt < RTT_NO_MEASUREMENT < RTT_TIMEOUT
248  if (stats.rtt == FaceInfo::RTT_TIMEOUT) {
249  return time::nanoseconds::max();
250  }
251  else if (stats.rtt == FaceInfo::RTT_NO_MEASUREMENT) {
252  return time::nanoseconds::max() / 2;
253  }
254  else {
255  return stats.srtt;
256  }
257  }
258 };
259 
260 Face*
261 AsfStrategy::getBestFaceForForwarding(const Interest& interest, const Face& inFace,
262  const fib::Entry& fibEntry, const shared_ptr<pit::Entry>& pitEntry,
263  bool isInterestNew)
264 {
265  std::set<FaceStats, FaceStatsCompare> rankedFaces;
266 
267  auto now = time::steady_clock::now();
268  for (const auto& nh : fibEntry.getNextHops()) {
269  if (!isNextHopEligible(inFace, interest, nh, pitEntry, !isInterestNew, now)) {
270  continue;
271  }
272 
273  const FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest.getName(), nh.getFace().getId());
274  if (info == nullptr) {
275  rankedFaces.insert({&nh.getFace(), FaceInfo::RTT_NO_MEASUREMENT,
276  FaceInfo::RTT_NO_MEASUREMENT, nh.getCost()});
277  }
278  else {
279  rankedFaces.insert({&nh.getFace(), info->getLastRtt(), info->getSrtt(), nh.getCost()});
280  }
281  }
282 
283  auto it = rankedFaces.begin();
284  return it != rankedFaces.end() ? it->face : nullptr;
285 }
286 
287 void
288 AsfStrategy::onTimeoutOrNack(const Name& interestName, FaceId faceId, bool isNack)
289 {
290  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(interestName);
291  if (namespaceInfo == nullptr) {
292  NFD_LOG_TRACE(interestName << " FibEntry has been removed since timeout scheduling");
293  return;
294  }
295 
296  FaceInfo* fiPtr = namespaceInfo->getFaceInfo(faceId);
297  if (fiPtr == nullptr) {
298  NFD_LOG_TRACE(interestName << " FaceInfo id=" << faceId << " has been removed since timeout scheduling");
299  return;
300  }
301 
302  auto& faceInfo = *fiPtr;
303  size_t nTimeouts = faceInfo.getNTimeouts() + 1;
304  faceInfo.setNTimeouts(nTimeouts);
305 
306  if (nTimeouts < m_nMaxTimeouts && !isNack) {
307  NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts << " ignoring");
308  // Extend lifetime for measurements associated with Face
309  namespaceInfo->extendFaceInfoLifetime(faceInfo, faceId);
310  faceInfo.cancelTimeout(interestName);
311  }
312  else {
313  NFD_LOG_TRACE(interestName << " face=" << faceId << " timeout-count=" << nTimeouts);
314  faceInfo.recordTimeout(interestName);
315  }
316 }
317 
318 void
319 AsfStrategy::sendNoRouteNack(Face& face, const shared_ptr<pit::Entry>& pitEntry)
320 {
321  lp::NackHeader nackHeader;
322  nackHeader.setReason(lp::NackReason::NO_ROUTE);
323  this->sendNack(nackHeader, face, pitEntry);
324  this->rejectPendingInterest(pitEntry);
325 }
326 
327 } // namespace nfd::fw::asf
This file contains common algorithms used by forwarding strategies.
Represents a face-endpoint pair in the forwarder.
Main class of NFD's forwarding engine.
Definition: forwarder.hpp:54
Generalization of a network interface.
Definition: face.hpp:56
FaceId getId() const noexcept
Returns the face ID.
Definition: face.hpp:121
Represents an entry in the FIB.
Definition: fib-entry.hpp:54
const Name & getPrefix() const
Definition: fib-entry.hpp:60
static std::unique_ptr< RetxSuppressionExponential > construct(const StrategyParameters &params)
Base class of all forwarding strategies.
Definition: strategy.hpp:42
void rejectPendingInterest(const shared_ptr< pit::Entry > &pitEntry)
Schedule the PIT entry for immediate deletion.
Definition: strategy.hpp:317
const fib::Entry & lookupFib(const pit::Entry &pitEntry) const
Performs a FIB lookup, considering Link object if present.
Definition: strategy.cpp:304
bool sendNack(const lp::NackHeader &header, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send a Nack packet.
Definition: strategy.hpp:333
pit::OutRecord * sendInterest(const Interest &interest, Face &egress, const shared_ptr< pit::Entry > &pitEntry)
Send an Interest packet.
Definition: strategy.cpp:226
static ParsedInstanceName parseInstanceName(const Name &input)
Parse a strategy instance name.
Definition: strategy.cpp:123
void setExpiryTimer(const shared_ptr< pit::Entry > &pitEntry, time::milliseconds duration)
Schedule the PIT entry to be erased after duration.
Definition: strategy.hpp:353
void setInstanceName(const Name &name) noexcept
Set strategy instance name.
Definition: strategy.hpp:415
static StrategyParameters parseParameters(const PartialName &params)
Parse strategy parameters encoded in a strategy instance name.
Definition: strategy.cpp:144
static Name makeInstanceName(const Name &input, const Name &strategyName)
Construct a strategy instance name.
Definition: strategy.cpp:134
std::enable_if_t< std::is_signed_v< T >, T > getOrDefault(const key_type &key, const T &defaultVal) const
Definition: strategy.hpp:456
NamespaceInfo & getOrCreateNamespaceInfo(const fib::Entry &fibEntry, const Name &prefix)
FaceInfo & getOrCreateFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
FaceInfo * getFaceInfo(const fib::Entry &fibEntry, const Name &interestName, FaceId faceId)
NamespaceInfo * getNamespaceInfo(const Name &prefix)
Adaptive SRTT-based forwarding strategy.
static const Name & getStrategyName()
void afterReceiveNack(const lp::Nack &nack, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after a Nack is received.
AsfStrategy(Forwarder &forwarder, const Name &name=getStrategyName())
void afterReceiveInterest(const Interest &interest, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger after an Interest is received.
void beforeSatisfyInterest(const Data &data, const FaceEndpoint &ingress, const shared_ptr< pit::Entry > &pitEntry) override
Trigger before a PIT entry is satisfied.
Strategy information for each face in a namespace.
static constexpr time::nanoseconds RTT_TIMEOUT
void cancelTimeout(const Name &prefix)
static constexpr time::nanoseconds RTT_NO_MEASUREMENT
time::nanoseconds getSrtt() const
void setNTimeouts(size_t nTimeouts)
time::nanoseconds getLastRtt() const
void recordTimeout(const Name &interestName)
void recordRtt(time::nanoseconds rtt)
time::nanoseconds scheduleTimeout(const Name &interestName, scheduler::EventCallback cb)
Stores strategy information about each face in this namespace.
FaceInfo * getFaceInfo(FaceId faceId)
void extendFaceInfoLifetime(FaceInfo &info, FaceId faceId)
Face * getFaceToProbe(const Face &inFace, const Interest &interest, const fib::Entry &fibEntry, const Face &faceUsed)
bool isProbingNeeded(const fib::Entry &fibEntry, const Name &interestName)
time::milliseconds getProbingInterval() const
void afterForwardingProbe(const fib::Entry &fibEntry, const Name &interestName)
void setProbingInterval(time::milliseconds probingInterval)
Contains information about an Interest toward an outgoing face.
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
#define NFD_LOG_DEBUG
Definition: logger.hpp:38
#define NFD_LOG_TRACE
Definition: logger.hpp:37
uint64_t FaceId
Identifies a face.
Definition: face-common.hpp:47
@ SUPPRESS
Interest is retransmission and should be suppressed.
@ FORWARD
Interest is retransmission and should be forwarded.
bool isNextHopEligible(const Face &inFace, const Interest &interest, const fib::NextHop &nexthop, const shared_ptr< pit::Entry > &pitEntry, bool wantUnused, time::steady_clock::time_point now)
Determines whether a NextHop is eligible, i.e., not the same inFace.
Definition: algorithm.cpp:130
fib::NextHopList::const_iterator findEligibleNextHopWithEarliestOutRecord(const Face &inFace, const Interest &interest, const fib::NextHopList &nexthops, const shared_ptr< pit::Entry > &pitEntry)
Pick an eligible NextHop with earliest out-record.
Definition: algorithm.cpp:108
NFD_REGISTER_STRATEGY(SelfLearningStrategy)
bool hasPendingOutRecords(const pit::Entry &pitEntry)
Determine whether pitEntry has any pending out-records.
Definition: algorithm.cpp:84
std::optional< uint64_t > version
The strategy version number, if present.
Definition: strategy.hpp:387
PartialName parameters
Parameter components, may be empty.
Definition: strategy.hpp:388