segment-fetcher.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2013-2019 Regents of the University of California,
4  * Colorado State University,
5  * University Pierre & Marie Curie, Sorbonne University.
6  *
7  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
8  *
9  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
10  * terms of the GNU Lesser General Public License as published by the Free Software
11  * Foundation, either version 3 of the License, or (at your option) any later version.
12  *
13  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
14  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
15  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
16  *
17  * You should have received copies of the GNU General Public License and GNU Lesser
18  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
19  * <http://www.gnu.org/licenses/>.
20  *
21  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
22  *
23  * @author Shuo Yang
24  * @author Weiwei Liu
25  * @author Chavoosh Ghasemi
26  */
27 
31 #include "ndn-cxx/lp/nack.hpp"
33 
34 #include <boost/asio/io_service.hpp>
35 #include <boost/lexical_cast.hpp>
36 #include <boost/range/adaptor/map.hpp>
37 
38 #include <cmath>
39 
40 namespace ndn {
41 namespace util {
42 
43 constexpr double SegmentFetcher::MIN_SSTHRESH;
44 
45 void
47 {
48  if (maxTimeout < 1_ms) {
49  NDN_THROW(std::invalid_argument("maxTimeout must be greater than or equal to 1 millisecond"));
50  }
51 
52  if (initCwnd < 1.0) {
53  NDN_THROW(std::invalid_argument("initCwnd must be greater than or equal to 1"));
54  }
55 
56  if (aiStep < 0.0) {
57  NDN_THROW(std::invalid_argument("aiStep must be greater than or equal to 0"));
58  }
59 
60  if (mdCoef < 0.0 || mdCoef > 1.0) {
61  NDN_THROW(std::invalid_argument("mdCoef must be in range [0, 1]"));
62  }
63 }
64 
65 SegmentFetcher::SegmentFetcher(Face& face,
66  security::v2::Validator& validator,
67  const SegmentFetcher::Options& options)
68  : m_options(options)
69  , m_face(face)
70  , m_scheduler(m_face.getIoService())
71  , m_validator(validator)
72  , m_rttEstimator(options.rttOptions)
73  , m_timeLastSegmentReceived(time::steady_clock::now())
74  , m_nextSegmentNum(0)
75  , m_cwnd(options.initCwnd)
76  , m_ssthresh(options.initSsthresh)
77  , m_nSegmentsInFlight(0)
78  , m_nSegments(0)
79  , m_highInterest(0)
80  , m_highData(0)
81  , m_recPoint(0)
82  , m_nReceived(0)
83  , m_nBytesReceived(0)
84 {
85  m_options.validate();
86 }
87 
88 shared_ptr<SegmentFetcher>
90  const Interest& baseInterest,
91  security::v2::Validator& validator,
92  const SegmentFetcher::Options& options)
93 {
94  shared_ptr<SegmentFetcher> fetcher(new SegmentFetcher(face, validator, options));
95  fetcher->m_this = fetcher;
96  fetcher->fetchFirstSegment(baseInterest, false);
97  return fetcher;
98 }
99 
100 void
102 {
103  if (!m_this) {
104  return;
105  }
106 
107  m_pendingSegments.clear(); // cancels pending Interests and timeout events
108  m_face.getIoService().post([self = std::move(m_this)] {});
109 }
110 
111 bool
112 SegmentFetcher::shouldStop(const weak_ptr<SegmentFetcher>& weakSelf)
113 {
114  auto self = weakSelf.lock();
115  return self == nullptr || self->m_this == nullptr;
116 }
117 
118 void
119 SegmentFetcher::fetchFirstSegment(const Interest& baseInterest, bool isRetransmission)
120 {
121  Interest interest(baseInterest);
122  interest.setCanBePrefix(true);
123  interest.setMustBeFresh(true);
124  interest.setInterestLifetime(m_options.interestLifetime);
125  if (isRetransmission) {
126  interest.refreshNonce();
127  }
128 
129  sendInterest(0, interest, isRetransmission);
130 }
131 
132 void
133 SegmentFetcher::fetchSegmentsInWindow(const Interest& origInterest)
134 {
135  if (checkAllSegmentsReceived()) {
136  // All segments have been retrieved
137  return finalizeFetch();
138  }
139 
140  int64_t availableWindowSize = static_cast<int64_t>(m_cwnd) - m_nSegmentsInFlight;
141  std::vector<std::pair<uint64_t, bool>> segmentsToRequest; // The boolean indicates whether a retx or not
142 
143  while (availableWindowSize > 0) {
144  if (!m_retxQueue.empty()) {
145  auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
146  m_retxQueue.pop();
147  if (pendingSegmentIt == m_pendingSegments.end()) {
148  // Skip re-requesting this segment, since it was received after RTO timeout
149  continue;
150  }
151  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
152  segmentsToRequest.emplace_back(pendingSegmentIt->first, true);
153  }
154  else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
155  if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
156  // Don't request a segment a second time if received in response to first "discovery" Interest
157  m_nextSegmentNum++;
158  continue;
159  }
160  segmentsToRequest.emplace_back(m_nextSegmentNum++, false);
161  }
162  else {
163  break;
164  }
165  availableWindowSize--;
166  }
167 
168  for (const auto& segment : segmentsToRequest) {
169  Interest interest(origInterest); // to preserve Interest elements
170  interest.setName(Name(m_versionedDataName).appendSegment(segment.first));
171  interest.setCanBePrefix(false);
172  interest.setMustBeFresh(false);
173  interest.setInterestLifetime(m_options.interestLifetime);
174  interest.refreshNonce();
175  sendInterest(segment.first, interest, segment.second);
176  }
177 }
178 
179 void
180 SegmentFetcher::sendInterest(uint64_t segNum, const Interest& interest, bool isRetransmission)
181 {
182  weak_ptr<SegmentFetcher> weakSelf = m_this;
183 
184  ++m_nSegmentsInFlight;
185  auto pendingInterest = m_face.expressInterest(interest,
186  [this, weakSelf] (const Interest& interest, const Data& data) {
187  afterSegmentReceivedCb(interest, data, weakSelf);
188  },
189  [this, weakSelf] (const Interest& interest, const lp::Nack& nack) {
190  afterNackReceivedCb(interest, nack, weakSelf);
191  },
192  nullptr);
193 
194  auto timeout = m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto();
195  auto timeoutEvent = m_scheduler.schedule(timeout, [this, interest, weakSelf] {
196  afterTimeoutCb(interest, weakSelf);
197  });
198 
199  if (isRetransmission) {
200  updateRetransmittedSegment(segNum, pendingInterest, timeoutEvent);
201  return;
202  }
203 
204  PendingSegment pendingSegment{SegmentState::FirstInterest, time::steady_clock::now(),
205  pendingInterest, timeoutEvent};
206  bool isNew = m_pendingSegments.emplace(segNum, std::move(pendingSegment)).second;
207  BOOST_VERIFY(isNew);
208  m_highInterest = segNum;
209 }
210 
211 void
212 SegmentFetcher::afterSegmentReceivedCb(const Interest& origInterest, const Data& data,
213  const weak_ptr<SegmentFetcher>& weakSelf)
214 {
215  if (shouldStop(weakSelf))
216  return;
217 
218  BOOST_ASSERT(m_nSegmentsInFlight > 0);
219  m_nSegmentsInFlight--;
220 
221  name::Component currentSegmentComponent = data.getName().get(-1);
222  if (!currentSegmentComponent.isSegment()) {
223  return signalError(DATA_HAS_NO_SEGMENT, "Data Name has no segment number");
224  }
225 
226  uint64_t currentSegment = currentSegmentComponent.toSegment();
227 
228  // The first received Interest could have any segment ID
229  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
230  if (m_receivedSegments.size() > 0) {
231  pendingSegmentIt = m_pendingSegments.find(currentSegment);
232  }
233  else {
234  pendingSegmentIt = m_pendingSegments.begin();
235  }
236 
237  if (pendingSegmentIt == m_pendingSegments.end()) {
238  return;
239  }
240 
241  pendingSegmentIt->second.timeoutEvent.cancel();
242 
243  afterSegmentReceived(data);
244 
245  m_validator.validate(data,
246  bind(&SegmentFetcher::afterValidationSuccess, this, _1, origInterest,
247  pendingSegmentIt, weakSelf),
248  bind(&SegmentFetcher::afterValidationFailure, this, _1, _2, weakSelf));
249 }
250 
251 void
252 SegmentFetcher::afterValidationSuccess(const Data& data, const Interest& origInterest,
253  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
254  const weak_ptr<SegmentFetcher>& weakSelf)
255 {
256  if (shouldStop(weakSelf))
257  return;
258 
259  // We update the last receive time here instead of in the segment received callback so that the
260  // transfer will not fail to terminate if we only received invalid Data packets.
261  m_timeLastSegmentReceived = time::steady_clock::now();
262 
263  m_nReceived++;
264 
265  // It was verified in afterSegmentReceivedCb that the last Data name component is a segment number
266  uint64_t currentSegment = data.getName().get(-1).toSegment();
267  // Add measurement to RTO estimator (if not retransmission)
268  if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
269  m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
270  std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
271  }
272 
273  // Remove from pending segments map
274  m_pendingSegments.erase(pendingSegmentIt);
275 
276  // Copy data in segment to temporary buffer
277  auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
278  std::forward_as_tuple(currentSegment),
279  std::forward_as_tuple(data.getContent().value_size()));
280  std::copy(data.getContent().value_begin(), data.getContent().value_end(),
281  receivedSegmentIt.first->second.begin());
282  m_nBytesReceived += data.getContent().value_size();
283  afterSegmentValidated(data);
284 
285  if (data.getFinalBlock()) {
286  if (!data.getFinalBlock()->isSegment()) {
287  return signalError(FINALBLOCKID_NOT_SEGMENT,
288  "Received FinalBlockId did not contain a segment component");
289  }
290 
291  if (data.getFinalBlock()->toSegment() + 1 != static_cast<uint64_t>(m_nSegments)) {
292  m_nSegments = data.getFinalBlock()->toSegment() + 1;
293  cancelExcessInFlightSegments();
294  }
295  }
296 
297  if (m_receivedSegments.size() == 1) {
298  m_versionedDataName = data.getName().getPrefix(-1);
299  if (currentSegment == 0) {
300  // We received the first segment in response, so we can increment the next segment number
301  m_nextSegmentNum++;
302  }
303  }
304 
305  if (m_highData < currentSegment) {
306  m_highData = currentSegment;
307  }
308 
309  if (data.getCongestionMark() > 0 && !m_options.ignoreCongMarks) {
310  windowDecrease();
311  }
312  else {
313  windowIncrease();
314  }
315 
316  fetchSegmentsInWindow(origInterest);
317 }
318 
319 void
320 SegmentFetcher::afterValidationFailure(const Data& data,
321  const security::v2::ValidationError& error,
322  const weak_ptr<SegmentFetcher>& weakSelf)
323 {
324  if (shouldStop(weakSelf))
325  return;
326 
327  signalError(SEGMENT_VALIDATION_FAIL, "Segment validation failed: " + boost::lexical_cast<std::string>(error));
328 }
329 
330 void
331 SegmentFetcher::afterNackReceivedCb(const Interest& origInterest, const lp::Nack& nack,
332  const weak_ptr<SegmentFetcher>& weakSelf)
333 {
334  if (shouldStop(weakSelf))
335  return;
336 
338 
339  BOOST_ASSERT(m_nSegmentsInFlight > 0);
340  m_nSegmentsInFlight--;
341 
342  switch (nack.getReason()) {
345  afterNackOrTimeout(origInterest);
346  break;
347  default:
348  signalError(NACK_ERROR, "Nack Error");
349  break;
350  }
351 }
352 
353 void
354 SegmentFetcher::afterTimeoutCb(const Interest& origInterest,
355  const weak_ptr<SegmentFetcher>& weakSelf)
356 {
357  if (shouldStop(weakSelf))
358  return;
359 
361 
362  BOOST_ASSERT(m_nSegmentsInFlight > 0);
363  m_nSegmentsInFlight--;
364  afterNackOrTimeout(origInterest);
365 }
366 
367 void
368 SegmentFetcher::afterNackOrTimeout(const Interest& origInterest)
369 {
370  if (time::steady_clock::now() >= m_timeLastSegmentReceived + m_options.maxTimeout) {
371  // Fail transfer due to exceeding the maximum timeout between the successful receipt of segments
372  return signalError(INTEREST_TIMEOUT, "Timeout exceeded");
373  }
374 
375  name::Component lastNameComponent = origInterest.getName().get(-1);
376  std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
377  BOOST_ASSERT(m_pendingSegments.size() > 0);
378  if (lastNameComponent.isSegment()) {
379  BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.toSegment()) > 0);
380  pendingSegmentIt = m_pendingSegments.find(lastNameComponent.toSegment());
381  }
382  else { // First Interest
383  BOOST_ASSERT(m_pendingSegments.size() > 0);
384  pendingSegmentIt = m_pendingSegments.begin();
385  }
386 
387  // Cancel timeout event and set status to InRetxQueue
388  pendingSegmentIt->second.timeoutEvent.cancel();
389  pendingSegmentIt->second.state = SegmentState::InRetxQueue;
390 
391  m_rttEstimator.backoffRto();
392 
393  if (m_receivedSegments.size() == 0) {
394  // Resend first Interest (until maximum receive timeout exceeded)
395  fetchFirstSegment(origInterest, true);
396  }
397  else {
398  windowDecrease();
399  m_retxQueue.push(pendingSegmentIt->first);
400  fetchSegmentsInWindow(origInterest);
401  }
402 }
403 
404 void
405 SegmentFetcher::finalizeFetch()
406 {
407  // Combine segments into final buffer
408  OBufferStream buf;
409  // We may have received more segments than exist in the object.
410  BOOST_ASSERT(m_receivedSegments.size() >= static_cast<uint64_t>(m_nSegments));
411 
412  for (int64_t i = 0; i < m_nSegments; i++) {
413  buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
414  }
415 
416  onComplete(buf.buf());
417  stop();
418 }
419 
420 void
421 SegmentFetcher::windowIncrease()
422 {
423  if (m_options.useConstantCwnd) {
424  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
425  return;
426  }
427 
428  if (m_cwnd < m_ssthresh) {
429  m_cwnd += m_options.aiStep; // additive increase
430  }
431  else {
432  m_cwnd += m_options.aiStep / std::floor(m_cwnd); // congestion avoidance
433  }
434 }
435 
436 void
437 SegmentFetcher::windowDecrease()
438 {
439  if (m_options.disableCwa || m_highData > m_recPoint) {
440  m_recPoint = m_highInterest;
441 
442  if (m_options.useConstantCwnd) {
443  BOOST_ASSERT(m_cwnd == m_options.initCwnd);
444  return;
445  }
446 
447  // Refer to RFC 5681, Section 3.1 for the rationale behind the code below
448  m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef); // multiplicative decrease
449  m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
450  }
451 }
452 
453 void
454 SegmentFetcher::signalError(uint32_t code, const std::string& msg)
455 {
456  onError(code, msg);
457  stop();
458 }
459 
460 void
461 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
462  const PendingInterestHandle& pendingInterest,
463  scheduler::EventId timeoutEvent)
464 {
465  auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
466  BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
467  BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
468  pendingSegmentIt->second.state = SegmentState::Retransmitted;
469  pendingSegmentIt->second.hdl = pendingInterest; // cancels previous pending Interest via scoped handle
470  pendingSegmentIt->second.timeoutEvent = timeoutEvent;
471 }
472 
473 void
474 SegmentFetcher::cancelExcessInFlightSegments()
475 {
476  for (auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
477  if (it->first >= static_cast<uint64_t>(m_nSegments)) {
478  it = m_pendingSegments.erase(it); // cancels pending Interest and timeout event
479  BOOST_ASSERT(m_nSegmentsInFlight > 0);
480  m_nSegmentsInFlight--;
481  }
482  else {
483  ++it;
484  }
485  }
486 }
487 
488 bool
489 SegmentFetcher::checkAllSegmentsReceived()
490 {
491  bool haveReceivedAllSegments = false;
492 
493  if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
494  haveReceivedAllSegments = true;
495  // Verify that all segments in window have been received. If not, send Interests for missing segments.
496  for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
497  if (m_receivedSegments.count(i) == 0) {
498  m_retxQueue.push(i);
499  haveReceivedAllSegments = false;
500  }
501  }
502  }
503 
504  return haveReceivedAllSegments;
505 }
506 
507 time::milliseconds
508 SegmentFetcher::getEstimatedRto()
509 {
510  // We don't want an Interest timeout greater than the maximum allowed timeout between the
511  // succesful receipt of segments
512  return std::min(m_options.maxTimeout,
513  time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
514 }
515 
516 } // namespace util
517 } // namespace ndn
const Name & getName() const
Definition: interest.hpp:134
Definition: data.cpp:26
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
Definition: packet-base.cpp:28
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
Definition: interest.hpp:215
static time_point now() noexcept
Definition: time.cpp:80
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Definition: interest.cpp:566
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
size_t value_size() const noexcept
Return the size of TLV-VALUE, aka TLV-LENGTH.
Definition: block.cpp:307
Represents an Interest packet.
Definition: interest.hpp:44
const optional< name::Component > & getFinalBlock() const
Definition: data.hpp:222
void stop()
Stops fetching.
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
Definition: block.hpp:296
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
Definition: block.hpp:305
represents a Network Nack
Definition: nack.hpp:38
#define NDN_THROW(e)
Definition: exception.hpp:61
NackReason getReason() const
Definition: nack.hpp:90
double aiStep
additive increase step (in segments)
bool isSegment() const
Check if the component is a segment number per NDN naming conventions.
One of the retrieved segments failed user-provided validation.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
A handle of pending Interest.
Definition: face.hpp:548
Interest & setName(const Name &name)
Definition: interest.hpp:140
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:93
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
double initSsthresh
initial slow start threshold
double initCwnd
initial congestion window size
const Name & getName() const
Get name.
Definition: data.hpp:124
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Block & getContent() const
Get Content.
Definition: data.cpp:232
PartialName getPrefix(ssize_t nComponents) const
Extract a prefix of the name.
Definition: name.hpp:203
Interest & setInterestLifetime(time::milliseconds lifetime)
Set Interest&#39;s lifetime.
Definition: interest.cpp:580
implements an output stream that constructs ndn::Buffer
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
A handle of scheduled event.
Definition: scheduler.hpp:59
Represents a Data packet.
Definition: data.hpp:35
A received FinalBlockId did not contain a segment component.
const Component & get(ssize_t i) const
Get the component at the given index.
Definition: name.hpp:157
Interface for validating data and interest packets.
Definition: validator.hpp:61
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
Definition: interest.hpp:187