segment-fetcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2021 Regents of the University of California,
4  * Colorado State University,
5  * University Pierre & Marie Curie, Sorbonne University.
6  *
7  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8  *
9  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10  * terms of the GNU Lesser General Public License as published by the Free Software
11  * Foundation, either version 3 of the License, or (at your option) any later version.
12  *
13  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16  *
17  * You should have received copies of the GNU General Public License and GNU Lesser
18  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19  * <http://www.gnu.org/licenses/>.
20  *
21  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22  */
23 
27 #include "ndn-cxx/lp/nack.hpp"
29 
30 #include <boost/asio/io_service.hpp>
31 #include <boost/lexical_cast.hpp>
32 #include <boost/range/adaptor/map.hpp>
33 
34 #include <cmath>
35 
36 namespace ndn {
37 namespace util {
38 
39 constexpr double SegmentFetcher::MIN_SSTHRESH;
40 
41 void
43 {
44  if (maxTimeout < 1_ms) {
45  NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
46  }
47 
48  if (initCwnd < 1.0) {
49  NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
50  }
51 
52  if (aiStep < 0.0) {
53  NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
54  }
55 
56  if (mdCoef < 0.0 || mdCoef > 1.0) {
57  NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
58  }
59 }
60 
61 SegmentFetcher::SegmentFetcher(Face& face,
62  security::Validator& validator,
63  const SegmentFetcher::Options& options)
64  : m_options(options)
65  , m_face(face)
66  , m_scheduler(m_face.getIoService())
67  , m_validator(validator)
68  , m_rttEstimator(make_shared<RttEstimator::Options>(options.rttOptions))
69  , m_timeLastSegmentReceived(time::steady_clock::now())
70  , m_cwnd(options.initCwnd)
71  , m_ssthresh(options.initSsthresh)
72 {
73  m_options.validate();
74 }
75 
76 shared_ptr<SegmentFetcher>
78  const Interest& baseInterest,
79  security::Validator& validator,
80  const SegmentFetcher::Options& options)
81 {
82  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
83  fetcher->m_this = fetcher;
84  fetcher->fetchFirstSegment(baseInterest, false);
85  return fetcher;
86 }
87 
88 void
90 {
91  if (!m_this) {
92  return;
93  }
94 
95  m_pendingSegments.clear(); // cancels pending Interests and timeout events
96  m_face.getIoService().post([self = std::move(m_this)] {});
97 }
98 
99 bool
100 SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
101 {
102  auto self = weakSelf.lock();
103  return self == nullptr || self->m_this == nullptr;
104 }
105 
106 void
107 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
108 {
109  Interest interest(baseInterest);
110  interest.setCanBePrefix(true);
111  interest.setMustBeFresh(true);
112  interest.setInterestLifetime(m_options.interestLifetime);
113  if (isRetransmission) {
114  interest.refreshNonce();
115  }
116 
117  sendInterest(0, interest, isRetransmission);
118 }
119 
120 void
121 SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
122 {
123  if (checkAllSegmentsReceived()) {
124  // All segments have been retrieved
125  return finalizeFetch();
126  }
127 
128  int64_t availableWindowSize;
129  if (m_options.inOrder) {
130  availableWindowSize = std::min<int64_t>(m_cwnd, m_options.flowControlWindow - m_segmentBuffer.size());
131  }
132  else {
133  availableWindowSize = static_cast<int64_t>(m_cwnd);
134  }
135  availableWindowSize -= m_nSegmentsInFlight;
136 
137  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
138 
139  while (availableWindowSize > 0) {
140  if (!m_retxQueue.empty()) {
141  auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
142  m_retxQueue.pop();
143  if (pendingSegmentIt == m_pendingSegments.end()) {
144  // Skip re-requesting this segment, since it was received after RTO timeout
145  continue;
146  }
147  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
148  segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
149  }
150  else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
151  if (m_segmentBuffer.count(m_nextSegmentNum) > 0) {
152  // Don't request a segment a second time if received in response to first "discovery" Interest
153  m_nextSegmentNum++;
154  continue;
155  }
156  segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
157  }
158  else {
159  break;
160  }
161  availableWindowSize--;
162  }
163 
164  for (const auto& segment : segmentsToRequest) {
165  Interest interest(origInterest); // to preserve Interest elements
166  interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
167  interest.setCanBePrefix(false);
168  interest.setMustBeFresh(false);
169  interest.setInterestLifetime(m_options.interestLifetime);
170  interest.refreshNonce();
171  sendInterest(segment.first, interest, segment.second);
172  }
173 }
174 
175 void
176 SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
177 {
178  weak_ptr<SegmentFetcher> weakSelf = m_this;
179 
180  ++m_nSegmentsInFlight;
181  auto pendingInterest = m_face.expressInterest(interest,
182  [this, weakSelf] (const Interest& interest, const Data& data) {
183  afterSegmentReceivedCb(interest, data, weakSelf);
184  },
185  [this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
186  afterNackReceivedCb(interest, nack, weakSelf);
187  },
188  nullptr);
189 
190  auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
191  auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
192  afterTimeoutCb(interest, weakSelf);
193  });
194 
195  if (isRetransmission) {
196  updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
197  return;
198  }
199 
200  PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
201  pendingInterest, timeoutEvent};
202  bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
203  BOOST_VERIFY(isNew);
204  m_highInterest = segNum;
205 }
206 
207 void
208 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
209  const weak_ptr<SegmentFetcher>& weakSelf)
210 {
211  if (shouldStop(weakSelf))
212  return;
213 
214  BOOST_ASSERT(m_nSegmentsInFlight > 0);
215  m_nSegmentsInFlight--;
216 
217  name::Component currentSegmentComponent = data.getName().get(-1);
218  if (!currentSegmentComponent.isSegment()) {
219  return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
220  }
221 
222  uint64_t currentSegment = currentSegmentComponent.toSegment();
223 
224  // The first received Interest could have any segment ID
225  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
226  if (m_receivedSegments.size() > 0) {
227  pendingSegmentIt = m_pendingSegments.find(currentSegment);
228  }
229  else {
230  pendingSegmentIt = m_pendingSegments.begin();
231  }
232 
233  if (pendingSegmentIt == m_pendingSegments.end()) {
234  return;
235  }
236 
237  pendingSegmentIt->second.timeoutEvent.cancel();
238 
239  afterSegmentReceived(data);
240 
241  m_validator.validate(data,
242  [=] (const Data& d) { afterValidationSuccess(d, origInterest, pendingSegmentIt, weakSelf); },
243  [=] (const Data& d, const auto& error) { afterValidationFailure(d, error, weakSelf); });
244 }
245 
246 void
247 SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
248  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
249  const weak_ptr<SegmentFetcher>& weakSelf)
250 {
251  if (shouldStop(weakSelf))
252  return;
253 
254  // We update the last receive time here instead of in the segment received callback so that the
255  // transfer will not fail to terminate if we only received invalid Data packets.
256  m_timeLastSegmentReceived = time::steady_clock::now();
257 
258  m_nReceived++;
259 
260  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
261  uint64_t currentSegment = data.getName().get(-1).toSegment();
262  m_receivedSegments.insert(currentSegment);
263 
264  // Add measurement to RTO estimator (if not retransmission)
265  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
266  BOOST_ASSERT(m_nSegmentsInFlight >= 0);
267  m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
268  static_cast<size_t>(m_nSegmentsInFlight) + 1);
269  }
270 
271  // Remove from pending segments map
272  m_pendingSegments.erase(pendingSegmentIt);
273 
274  // Copy data in segment to temporary buffer
275  auto receivedSegmentIt = m_segmentBuffer.emplace(std::piecewise_construct,
276  std::forward_as_tuple(currentSegment),
277  std::forward_as_tuple(data.getContent().value_size()));
278  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
279  receivedSegmentIt.first->second.begin());
280  m_nBytesReceived += data.getContent().value_size();
281  afterSegmentValidated(data);
282 
283  if (data.getFinalBlock()) {
284  if (!data.getFinalBlock()->isSegment()) {
285  return signalError(FINALBLOCKID_NOT_SEGMENT,
286  "Received FinalBlockId did not contain a segment component");
287  }
288 
289  if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
290  m_nSegments = data.getFinalBlock()->toSegment() + 1;
291  cancelExcessInFlightSegments();
292  }
293  }
294 
295  if (m_options.inOrder && m_nextSegmentInOrder == currentSegment) {
296  do {
297  onInOrderData(std::make_shared<const Buffer>(m_segmentBuffer[m_nextSegmentInOrder]));
298  m_segmentBuffer.erase(m_nextSegmentInOrder++);
299  } while (m_segmentBuffer.count(m_nextSegmentInOrder) > 0);
300  }
301 
302  if (m_receivedSegments.size() == 1) {
303  m_versionedDataName = data.getName().getPrefix(-1);
304  if (currentSegment == 0) {
305  // We received the first segment in response, so we can increment the next segment number
306  m_nextSegmentNum++;
307  }
308  }
309 
310  if (m_highData < currentSegment) {
311  m_highData = currentSegment;
312  }
313 
314  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
315  windowDecrease();
316  }
317  else {
318  windowIncrease();
319  }
320 
321  fetchSegmentsInWindow(origInterest);
322 }
323 
324 void
325 SegmentFetcher::afterValidationFailure(const Data&,
326  const security::ValidationError& error,
327  const weak_ptr<SegmentFetcher>& weakSelf)
328 {
329  if (shouldStop(weakSelf))
330  return;
331 
332  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
333 }
334 
335 void
336 SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
337  const weak_ptr<SegmentFetcher>& weakSelf)
338 {
339  if (shouldStop(weakSelf))
340  return;
341 
343 
344  BOOST_ASSERT(m_nSegmentsInFlight > 0);
345  m_nSegmentsInFlight--;
346 
347  switch (nack.getReason()) {
350  afterNackOrTimeout(origInterest);
351  break;
352  default:
353  signalError(NACK_ERROR, "Nack Error");
354  break;
355  }
356 }
357 
358 void
359 SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
360  const weak_ptr<SegmentFetcher>& weakSelf)
361 {
362  if (shouldStop(weakSelf))
363  return;
364 
366 
367  BOOST_ASSERT(m_nSegmentsInFlight > 0);
368  m_nSegmentsInFlight--;
369  afterNackOrTimeout(origInterest);
370 }
371 
372 void
373 SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
374 {
375  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
376  // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
377  return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
378  }
379 
380  name::Component lastNameComponent = origInterest.getName().get(-1);
381  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
382  BOOST_ASSERT(m_pendingSegments.size() > 0);
383  if (lastNameComponent.isSegment()) {
384  BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
385  pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
386  }
387  else { // First Interest
388  BOOST_ASSERT(m_pendingSegments.size() > 0);
389  pendingSegmentIt = m_pendingSegments.begin();
390  }
391 
392  // Cancel timeout event and set status to InRetxQueue
393  pendingSegmentIt->second.timeoutEvent.cancel();
394  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
395 
396  m_rttEstimator.backoffRto();
397 
398  if (m_receivedSegments.size() == 0) {
399  // Resend first Interest (until maximum receive timeout exceeded)
400  fetchFirstSegment(origInterest, true);
401  }
402  else {
403  windowDecrease();
404  m_retxQueue.push(pendingSegmentIt->first);
405  fetchSegmentsInWindow(origInterest);
406  }
407 }
408 
409 void
410 SegmentFetcher::finalizeFetch()
411 {
412  if (m_options.inOrder) {
414  }
415  else {
416  // Combine segments into final buffer
417  OBufferStream buf;
418  // We may have received more segments than exist in the object.
419  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
420 
421  for (int64_t i = 0; i < m_nSegments; i++) {
422  buf.write(m_segmentBuffer[i].get<const char>(), m_segmentBuffer[i].size());
423  }
424  onComplete(buf.buf());
425  }
426  stop();
427 }
428 
429 void
430 SegmentFetcher::windowIncrease()
431 {
432  if (m_options.useConstantCwnd) {
433  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
434  return;
435  }
436 
437  if (m_cwnd < m_ssthresh) {
438  m_cwnd += m_options.aiStep; // additive increase
439  }
440  else {
441  m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
442  }
443 }
444 
445 void
446 SegmentFetcher::windowDecrease()
447 {
448  if (m_options.disableCwa || m_highData > m_recPoint) {
449  m_recPoint = m_highInterest;
450 
451  if (m_options.useConstantCwnd) {
452  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
453  return;
454  }
455 
456  // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
457  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
458  m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
459  }
460 }
461 
462 void
463 SegmentFetcher::signalError(uint32_t code, const std::string& msg)
464 {
465  onError(code, msg);
466  stop();
467 }
468 
469 void
470 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
471  const PendingInterestHandle& pendingInterest,
472  scheduler::EventId timeoutEvent)
473 {
474  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
475  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
476  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
477  pendingSegmentIt->second.state = SegmentState::Retransmitted;
478  pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
479  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
480 }
481 
482 void
483 SegmentFetcher::cancelExcessInFlightSegments()
484 {
485  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
486  if (it->first >= static_cast<uint64_t>(m_nSegments)) {
487  it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
488  BOOST_ASSERT(m_nSegmentsInFlight > 0);
489  m_nSegmentsInFlight--;
490  }
491  else {
492  ++it;
493  }
494  }
495 }
496 
497 bool
498 SegmentFetcher::checkAllSegmentsReceived()
499 {
500  bool haveReceivedAllSegments = false;
501 
502  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
503  haveReceivedAllSegments = true;
504  // Verify that all segments in window have been received. If not, send Interests for missing segments.
505  for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
506  if (m_receivedSegments.count(i) == 0) {
507  m_retxQueue.push(i);
508  haveReceivedAllSegments = false;
509  }
510  }
511  }
512 
513  return haveReceivedAllSegments;
514 }
515 
517 SegmentFetcher::getEstimatedRto()
518 {
519  // We don't want an Interest timeout greater than the maximum allowed timeout between the
520  // succesful receipt of segments
521  return std::min(m_options.maxTimeout,
522  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
523 }
524 
525 } // namespace util
526 } // namespace ndn
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:91
boost::asio::io_service & getIoService()
Returns a reference to the io_service used by this face.
Definition: face.hpp:423
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
Definition: face.cpp:164
Represents an Interest packet.
Definition: interest.hpp:50
PartialName getPrefix(ssize_t nComponents) const
Returns a prefix of the name.
Definition: name.hpp:216
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:92
Interface for validating data and interest packets.
Definition: validator.hpp:62
void validate(const Data &data, const DataValidationSuccessCallback &successCb, const DataValidationFailureCallback &failureCb)
Asynchronously validate data.
Definition: validator.cpp:49
static time_point now() noexcept
Definition: time.cpp:80
RTT/RTO estimator.
void backoffRto()
Backoff RTO by a factor of Options::rtoBackoffMultiplier.
void addMeasurement(time::nanoseconds rtt, size_t nExpectedSamples=1)
Records a new RTT measurement.
time::nanoseconds getEstimatedRto() const
Returns the estimated RTO value.
bool useConstantInterestTimeout
if true, Interest timeout is kept at maxTimeout
double mdCoef
multiplicative decrease coefficient
double aiStep
additive increase step (in segments)
double initCwnd
initial congestion window size
bool disableCwa
disable Conservative Window Adaptation
bool resetCwndToInit
reduce cwnd to initCwnd when loss event occurs
time::milliseconds interestLifetime
lifetime of sent Interests - independent of Interest timeout
bool useConstantCwnd
if true, window size is kept at initCwnd
bool inOrder
true for 'in order' mode, false for 'block' mode
size_t flowControlWindow
maximum number of segments stored in the reorder buffer
bool ignoreCongMarks
disable window decrease after congestion mark received
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
Signal< SegmentFetcher > onInOrderComplete
Emitted on successful retrieval of all segments in 'in order' mode.
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emitted upon successful retrieval of the complete object (all segments).
Signal< SegmentFetcher, ConstBufferPtr > onInOrderData
Emitted after each data segment in segment order has been validated.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emitted whenever a received data segment has been successfully validated.
@ INTEREST_TIMEOUT
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
@ DATA_HAS_NO_SEGMENT
One of the retrieved Data packets lacked a segment number in the last Name component (excl....
@ FINALBLOCKID_NOT_SEGMENT
A received FinalBlockId did not contain a segment component.
@ SEGMENT_VALIDATION_FAIL
One of the retrieved segments failed user-provided validation.
@ NACK_ERROR
An unrecoverable Nack was received during retrieval.
Signal< SegmentFetcher > afterSegmentNacked
Emitted whenever an Interest for a data segment is nacked.
void stop()
Stops fetching.
Signal< SegmentFetcher, uint32_t, std::string > onError
Emitted when the retrieval could not be completed due to an error.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emitted whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emitted whenever an Interest for a data segment times out.
#define NDN_THROW(e)
Definition: exception.hpp:61
boost::chrono::milliseconds milliseconds
Definition: time.hpp:48
@ Name
Definition: tlv.hpp:71
@ Data
Definition: tlv.hpp:69
@ Interest
Definition: tlv.hpp:68
Definition: data.cpp:25