asf-strategy.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2019, Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of NFD (Named Data Networking Forwarding Daemon).
12  * See AUTHORS.md for complete list of NFD authors and contributors.
13  *
14  * NFD is free software: you can redistribute it and/or modify it under the terms
15  * of the GNU General Public License as published by the Free Software Foundation,
16  * either version 3 of the License, or (at your option) any later version.
17  *
18  * NFD is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
19  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
20  * PURPOSE. See the GNU General Public License for more details.
21  *
22  * You should have received a copy of the GNU General Public License along with
23  * NFD, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
24  */
25 
26 #include "asf-strategy.hpp"
27 #include "algorithm.hpp"
28 #include "common/global.hpp"
29 #include "common/logger.hpp"
30 
31 namespace nfd {
32 namespace fw {
33 namespace asf {
34 
35 NFD_LOG_INIT(AsfStrategy);
36 NFD_REGISTER_STRATEGY(AsfStrategy);
37 
38 const time::milliseconds AsfStrategy::RETX_SUPPRESSION_INITIAL(10);
39 const time::milliseconds AsfStrategy::RETX_SUPPRESSION_MAX(250);
40 
41 AsfStrategy::AsfStrategy(Forwarder& forwarder, const Name& name)
42  : Strategy(forwarder)
43  , m_measurements(getMeasurements())
44  , m_probing(m_measurements)
45  , m_maxSilentTimeouts(0)
46  , m_retxSuppression(RETX_SUPPRESSION_INITIAL,
47  RetxSuppressionExponential::DEFAULT_MULTIPLIER,
48  RETX_SUPPRESSION_MAX)
49 {
51  if (!parsed.parameters.empty()) {
52  processParams(parsed.parameters);
53  }
54 
55  if (parsed.version && *parsed.version != getStrategyName()[-1].toVersion()) {
56  NDN_THROW(std::invalid_argument(
57  "AsfStrategy does not support version " + to_string(*parsed.version)));
58  }
60 
61  NFD_LOG_DEBUG("Probing interval=" << m_probing.getProbingInterval()
62  << ", Num silent timeouts=" << m_maxSilentTimeouts);
63 }
64 
65 const Name&
67 {
68  static Name strategyName("/localhost/nfd/strategy/asf/%FD%03");
69  return strategyName;
70 }
71 
72 void
73 AsfStrategy::processParams(const PartialName& parsed)
74 {
75  for (const auto& component : parsed) {
76  std::string parsedStr(reinterpret_cast<const char*>(component.value()), component.value_size());
77  auto n = parsedStr.find("~");
78  if (n == std::string::npos) {
79  NDN_THROW(std::invalid_argument("Format is <parameter>~<value>"));
80  }
81 
82  auto f = parsedStr.substr(0, n);
83  auto s = parsedStr.substr(n + 1);
84  if (f == "probing-interval") {
85  m_probing.setProbingInterval(getParamValue(f, s));
86  }
87  else if (f == "n-silent-timeouts") {
88  m_maxSilentTimeouts = getParamValue(f, s);
89  }
90  else {
91  NDN_THROW(std::invalid_argument("Parameter should be probing-interval or n-silent-timeouts"));
92  }
93  }
94 }
95 
96 uint64_t
97 AsfStrategy::getParamValue(const std::string& param, const std::string& value)
98 {
99  try {
100  if (!value.empty() && value[0] == '-')
101  NDN_THROW(boost::bad_lexical_cast());
102 
103  return boost::lexical_cast<uint64_t>(value);
104  }
105  catch (const boost::bad_lexical_cast&) {
106  NDN_THROW(std::invalid_argument("Value of " + param + " must be a non-negative integer"));
107  }
108 }
109 
110 void
111 AsfStrategy::sendAsfProbe(const FaceEndpoint& ingress, const Interest& interest,
112  const shared_ptr<pit::Entry>& pitEntry, const Face& faceToUse,
113  const fib::Entry& fibEntry)
114 {
115  Face* faceToProbe = m_probing.getFaceToProbe(ingress.face, interest, fibEntry, faceToUse);
116  if (faceToProbe != nullptr) {
117  forwardInterest(interest, fibEntry, pitEntry, *faceToProbe, true);
118  m_probing.afterForwardingProbe(fibEntry, interest);
119  }
120 }
121 
122 void
123 AsfStrategy::afterReceiveInterest(const FaceEndpoint& ingress, const Interest& interest,
124  const shared_ptr<pit::Entry>& pitEntry)
125 {
126  // Should the Interest be suppressed?
127  RetxSuppressionResult suppressResult = m_retxSuppression.decidePerPitEntry(*pitEntry);
128  if (suppressResult == RetxSuppressionResult::SUPPRESS) {
129  NFD_LOG_DEBUG(interest << " from=" << ingress << " suppressed");
130  return;
131  }
132 
133  const fib::Entry& fibEntry = this->lookupFib(*pitEntry);
134  const fib::NextHopList& nexthops = fibEntry.getNextHops();
135 
136  if (suppressResult == RetxSuppressionResult::NEW) {
137  if (nexthops.size() == 0) {
138  // send noRouteNack if nexthop is not available
139  sendNoRouteNack(ingress, interest, pitEntry);
140  this->rejectPendingInterest(pitEntry);
141  return;
142  }
143 
144  Face* faceToUse = getBestFaceForForwarding(fibEntry, interest, ingress.face, pitEntry);
145 
146  if (faceToUse == nullptr) {
147  sendNoRouteNack(ingress, interest, pitEntry);
148  this->rejectPendingInterest(pitEntry);
149  return;
150  }
151 
152  NFD_LOG_TRACE("Forwarding interest to face: " << faceToUse->getId());
153  forwardInterest(interest, fibEntry, pitEntry, *faceToUse);
154 
155  // If necessary, send probe
156  if (m_probing.isProbingNeeded(fibEntry, interest)) {
157  sendAsfProbe(ingress, interest, pitEntry, *faceToUse, fibEntry);
158  }
159  return;
160  }
161 
162  Face* faceToUse = getBestFaceForForwarding(fibEntry, interest, ingress.face, pitEntry, false);
163  // if unused face not found, select nexthop with earliest out record
164  if (faceToUse != nullptr) {
165 
166  NFD_LOG_TRACE("Forwarding interest to face: " << faceToUse->getId());
167  forwardInterest(interest, fibEntry, pitEntry, *faceToUse);
168  // avoid probing in case of forwarding
169  return;
170  }
171 
172  // find an eligible upstream that is used earliest
173  auto it = nexthops.end();
174  it = findEligibleNextHopWithEarliestOutRecord(ingress.face, interest, nexthops, pitEntry);
175  if (it == nexthops.end()) {
176  NFD_LOG_DEBUG(interest << " from=" << ingress << " retransmitNoNextHop");
177  }
178  else {
179  auto egress = FaceEndpoint(it->getFace(), 0);
180  NFD_LOG_DEBUG(interest << " from=" << ingress << " retransmit-retry-to=" << egress);
181  this->sendInterest(pitEntry, egress, interest);
182  }
183 }
184 
185 void
186 AsfStrategy::beforeSatisfyInterest(const shared_ptr<pit::Entry>& pitEntry,
187  const FaceEndpoint& ingress, const Data& data)
188 {
189  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(pitEntry->getName());
190 
191  if (namespaceInfo == nullptr) {
192  NFD_LOG_TRACE("Could not find measurements entry for " << pitEntry->getName());
193  return;
194  }
195 
196  // Record the RTT between the Interest out to Data in
197  FaceInfo* faceInfo = namespaceInfo->get(ingress.face.getId());
198  if (faceInfo == nullptr) {
199  return;
200  }
201  faceInfo->recordRtt(pitEntry, ingress.face);
202 
203  // Extend lifetime for measurements associated with Face
204  namespaceInfo->extendFaceInfoLifetime(*faceInfo, ingress.face.getId());
205 
206  if (faceInfo->isTimeoutScheduled()) {
207  faceInfo->cancelTimeoutEvent(data.getName());
208  }
209 }
210 
211 void
212 AsfStrategy::afterReceiveNack(const FaceEndpoint& ingress, const lp::Nack& nack,
213  const shared_ptr<pit::Entry>& pitEntry)
214 {
215  NFD_LOG_DEBUG("Nack for " << nack.getInterest() << " from=" << ingress << ": reason=" << nack.getReason());
216  onTimeout(pitEntry->getName(), ingress.face.getId());
217 }
218 
221 
222 void
223 AsfStrategy::forwardInterest(const Interest& interest,
224  const fib::Entry& fibEntry,
225  const shared_ptr<pit::Entry>& pitEntry,
226  Face& outFace,
227  bool wantNewNonce)
228 {
229  auto egress = FaceEndpoint(outFace, 0);
230  if (wantNewNonce) {
231  //Send probe: interest with new Nonce
232  Interest probeInterest(interest);
233  probeInterest.refreshNonce();
234  NFD_LOG_TRACE("Sending probe for " << probeInterest << probeInterest.getNonce()
235  << " to: " << egress);
236  this->sendInterest(pitEntry, egress, probeInterest);
237  }
238  else {
239  this->sendInterest(pitEntry, egress, interest);
240  }
241 
242  FaceInfo& faceInfo = m_measurements.getOrCreateFaceInfo(fibEntry, interest, egress.face.getId());
243 
244  // Refresh measurements since Face is being used for forwarding
245  NamespaceInfo& namespaceInfo = m_measurements.getOrCreateNamespaceInfo(fibEntry, interest);
246  namespaceInfo.extendFaceInfoLifetime(faceInfo, egress.face.getId());
247 
248  if (!faceInfo.isTimeoutScheduled()) {
249  // Estimate and schedule timeout
250  RttEstimator::Duration timeout = faceInfo.computeRto();
251 
252  NFD_LOG_TRACE("Scheduling timeout for " << fibEntry.getPrefix() << " to: " << egress
253  << " in " << time::duration_cast<time::milliseconds>(timeout) << " ms");
254 
255  auto id = getScheduler().schedule(timeout, bind(&AsfStrategy::onTimeout, this,
256  interest.getName(), egress.face.getId()));
257  faceInfo.setTimeoutEvent(id, interest.getName());
258  }
259 }
260 
261 struct FaceStats
262 {
263  Face* face;
264  RttStats::Rtt rtt;
265  RttStats::Rtt srtt;
266  uint64_t cost;
267 };
268 
269 double
270 getValueForSorting(const FaceStats& stats)
271 {
272  // These values allow faces with no measurements to be ranked better than timeouts
273  // srtt < RTT_NO_MEASUREMENT < RTT_TIMEOUT
274  static const RttStats::Rtt SORTING_RTT_TIMEOUT = time::microseconds::max();
275  static const RttStats::Rtt SORTING_RTT_NO_MEASUREMENT = SORTING_RTT_TIMEOUT / 2;
276 
277  if (stats.rtt == RttStats::RTT_TIMEOUT) {
278  return SORTING_RTT_TIMEOUT.count();
279  }
280  else if (stats.rtt == RttStats::RTT_NO_MEASUREMENT) {
281  return SORTING_RTT_NO_MEASUREMENT.count();
282  }
283  else {
284  return stats.srtt.count();
285  }
286 }
287 
288 Face*
289 AsfStrategy::getBestFaceForForwarding(const fib::Entry& fibEntry, const Interest& interest,
290  const Face& inFace, const shared_ptr<pit::Entry>& pitEntry,
291  bool isInterestNew)
292 {
293  NFD_LOG_TRACE("Looking for best face for " << fibEntry.getPrefix());
294 
295  typedef std::function<bool(const FaceStats&, const FaceStats&)> FaceStatsPredicate;
296  typedef std::set<FaceStats, FaceStatsPredicate> FaceStatsSet;
297 
298  FaceStatsSet rankedFaces(
299  [] (const FaceStats& lhs, const FaceStats& rhs) -> bool {
300  // Sort by RTT and then by cost
301  double lhsValue = getValueForSorting(lhs);
302  double rhsValue = getValueForSorting(rhs);
303 
304  if (lhsValue < rhsValue) {
305  return true;
306  }
307  else if (lhsValue == rhsValue) {
308  return lhs.cost < rhs.cost;
309  }
310  else {
311  return false;
312  }
313  });
314 
315  auto now = time::steady_clock::now();
316  for (const fib::NextHop& hop : fibEntry.getNextHops()) {
317  Face& hopFace = hop.getFace();
318 
319  if (!isNextHopEligible(inFace, interest, hop, pitEntry, !isInterestNew, now)) {
320  continue;
321  }
322 
323  FaceInfo* info = m_measurements.getFaceInfo(fibEntry, interest, hopFace.getId());
324 
325  if (info == nullptr) {
326  FaceStats stats = {&hopFace,
329  hop.getCost()};
330 
331  rankedFaces.insert(stats);
332  }
333  else {
334  FaceStats stats = {&hopFace, info->getRtt(), info->getSrtt(), hop.getCost()};
335  rankedFaces.insert(stats);
336  }
337  }
338 
339  FaceStatsSet::iterator it = rankedFaces.begin();
340 
341  if (it != rankedFaces.end()) {
342  return it->face;
343  }
344  else {
345  return nullptr;
346  }
347 }
348 
349 void
350 AsfStrategy::onTimeout(const Name& interestName, const face::FaceId faceId)
351 {
352  NamespaceInfo* namespaceInfo = m_measurements.getNamespaceInfo(interestName);
353 
354  if (namespaceInfo == nullptr) {
355  NFD_LOG_TRACE("FibEntry for " << interestName << " has been removed since timeout scheduling");
356  return;
357  }
358 
359  FaceInfoTable::iterator it = namespaceInfo->find(faceId);
360 
361  if (it == namespaceInfo->end()) {
362  it = namespaceInfo->insert(faceId);
363  }
364 
365  FaceInfo& faceInfo = it->second;
366 
367  faceInfo.setNSilentTimeouts(faceInfo.getNSilentTimeouts() + 1);
368 
369  if (faceInfo.getNSilentTimeouts() <= m_maxSilentTimeouts) {
370  NFD_LOG_TRACE("FaceId " << faceId << " for " << interestName << " has timed-out "
371  << faceInfo.getNSilentTimeouts() << " time(s), ignoring");
372  // Extend lifetime for measurements associated with Face
373  namespaceInfo->extendFaceInfoLifetime(faceInfo, faceId);
374 
375  if (faceInfo.isTimeoutScheduled()) {
376  faceInfo.cancelTimeoutEvent(interestName);
377  }
378  }
379  else {
380  NFD_LOG_TRACE("FaceId " << faceId << " for " << interestName << " has timed-out");
381  faceInfo.recordTimeout(interestName);
382  }
383 }
384 
385 void
386 AsfStrategy::sendNoRouteNack(const FaceEndpoint& ingress, const Interest& interest,
387  const shared_ptr<pit::Entry>& pitEntry)
388 {
389  NFD_LOG_DEBUG(interest << " from=" << ingress << " noNextHop");
390 
391  lp::NackHeader nackHeader;
392  nackHeader.setReason(lp::NackReason::NO_ROUTE);
393  this->sendNack(pitEntry, ingress, nackHeader);
394 }
395 
396 } // namespace asf
397 } // namespace fw
398 } // namespace nfd
Main class of NFD forwarding engine.
Definition: forwarder.hpp:51
void setInstanceName(const Name &name)
set strategy instance name
Definition: strategy.hpp:369
void afterForwardingProbe(const fib::Entry &fibEntry, const Interest &interest)
void extendFaceInfoLifetime(FaceInfo &info, FaceId faceId)
RttStats::Rtt getRtt() const
static const Name & getStrategyName()
NamespaceInfo * getNamespaceInfo(const Name &prefix)
void sendNack(const shared_ptr< pit::Entry > &pitEntry, const FaceEndpoint &egress, const lp::NackHeader &header)
send Nack to egress
Definition: strategy.hpp:289
represents a FIB entry
Definition: fib-entry.hpp:51
fib::NextHopList::const_iterator findEligibleNextHopWithEarliestOutRecord(const Face &inFace, const Interest &interest, const fib::NextHopList &nexthops, const shared_ptr< pit::Entry > &pitEntry)
pick an eligible NextHop with earliest out-record
Definition: algorithm.cpp:132
NFD_REGISTER_STRATEGY(AsfStrategy)
#define NFD_LOG_TRACE
Definition: logger.hpp:37
void beforeSatisfyInterest(const shared_ptr< pit::Entry > &pitEntry, const FaceEndpoint &ingress, const Data &data) override
trigger before PIT entry is satisfied
FaceInfo & getOrCreateFaceInfo(const fib::Entry &fibEntry, const Interest &interest, FaceId faceId)
Interest is retransmission and should be suppressed.
FaceInfo * get(FaceId faceId)
static Name makeInstanceName(const Name &input, const Name &strategyName)
construct a strategy instance name
Definition: strategy.cpp:132
const FaceInfoTable::iterator insert(FaceId faceId)
RttStats::Rtt getSrtt() const
time::duration< double, boost::micro > Rtt
FaceInfoTable::iterator end()
time::microseconds Duration
Scheduler & getScheduler()
Returns the global Scheduler instance for the calling thread.
Definition: global.cpp:45
void sendInterest(const shared_ptr< pit::Entry > &pitEntry, const FaceEndpoint &egress, const Interest &interest)
send Interest to egress
Definition: strategy.hpp:243
Table::const_iterator iterator
Definition: cs-internal.hpp:41
void cancelTimeoutEvent(const Name &prefix)
FaceInfoTable::iterator find(FaceId faceId)
void afterReceiveNack(const FaceEndpoint &ingress, const lp::Nack &nack, const shared_ptr< pit::Entry > &pitEntry) override
trigger after Nack is received
const Name & getPrefix() const
Definition: fib-entry.hpp:58
Represents a face-endpoint pair in the forwarder.
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
Definition: algorithm.hpp:32
Interest is new (not a retransmission)
bool isNextHopEligible(const Face &inFace, const Interest &interest, const fib::NextHop &nexthop, const shared_ptr< pit::Entry > &pitEntry, bool wantUnused, time::steady_clock::TimePoint now)
determines whether a NextHop is eligible i.e.
Definition: algorithm.cpp:154
a retransmission suppression decision algorithm that suppresses retransmissions using exponential bac...
NamespaceInfo & getOrCreateNamespaceInfo(const fib::Entry &fibEntry, const Interest &interest)
bool isProbingNeeded(const fib::Entry &fibEntry, const Interest &interest)
Represents a collection of nexthops.
PartialName parameters
parameter components
Definition: strategy.hpp:342
bool isTimeoutScheduled() const
void recordRtt(const shared_ptr< pit::Entry > &pitEntry, const Face &inFace)
time::milliseconds getProbingInterval() const
stores stategy information about each face in this namespace
RetxSuppressionResult decidePerPitEntry(pit::Entry &pitEntry)
determines whether Interest is a retransmission per pit entry and if so, whether it shall be forwarde...
represents a forwarding strategy
Definition: strategy.hpp:37
void recordTimeout(const Name &interestName)
This file contains common algorithms used by forwarding strategies.
#define NFD_LOG_DEBUG
Definition: logger.hpp:38
void setTimeoutEvent(const scheduler::EventId &id, const Name &interestName)
#define NFD_LOG_INIT(name)
Definition: logger.hpp:31
static const Rtt RTT_TIMEOUT
static ParsedInstanceName parseInstanceName(const Name &input)
parse a strategy instance name
Definition: strategy.cpp:121
void setProbingInterval(size_t probingInterval)
size_t getNSilentTimeouts() const
double getValueForSorting(const FaceStats &stats)
Face * getFaceToProbe(const Face &inFace, const Interest &interest, const fib::Entry &fibEntry, const Face &faceUsed)
uint64_t FaceId
identifies a face
Definition: face.hpp:39
void afterReceiveInterest(const FaceEndpoint &ingress, const Interest &interest, const shared_ptr< pit::Entry > &pitEntry) override
trigger after Interest is received
RttEstimator::Duration computeRto() const
Represents a nexthop record in a FIB entry.
Definition: fib-nexthop.hpp:37
const NextHopList & getNextHops() const
Definition: fib-entry.hpp:64
void setNSilentTimeouts(size_t nSilentTimeouts)
void rejectPendingInterest(const shared_ptr< pit::Entry > &pitEntry)
schedule the PIT entry for immediate deletion
Definition: strategy.hpp:276
const fib::Entry & lookupFib(const pit::Entry &pitEntry) const
performs a FIB lookup, considering Link object if present
Definition: strategy.cpp:255
Strategy information for each face in a namespace.
AsfStrategy(Forwarder &forwarder, const Name &name=getStrategyName())
static const Rtt RTT_NO_MEASUREMENT
optional< uint64_t > version
whether strategyName contains a version component
Definition: strategy.hpp:341