34 #include <boost/asio/io_service.hpp> 35 #include <boost/lexical_cast.hpp> 36 #include <boost/range/adaptor/map.hpp> 43 constexpr
double SegmentFetcher::MIN_SSTHRESH;
49 BOOST_THROW_EXCEPTION(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
53 BOOST_THROW_EXCEPTION(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
57 BOOST_THROW_EXCEPTION(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
60 if (mdCoef < 0.0 || mdCoef > 1.0) {
61 BOOST_THROW_EXCEPTION(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
65 SegmentFetcher::SegmentFetcher(
Face& face,
70 , m_scheduler(m_face.getIoService())
71 , m_validator(validator)
77 , m_nSegmentsInFlight(0)
88 shared_ptr<SegmentFetcher>
94 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
95 fetcher->m_this = fetcher;
96 fetcher->fetchFirstSegment(baseInterest,
false);
107 for (
const auto& pendingSegment : m_pendingSegments | boost::adaptors::map_values) {
108 m_face.removePendingInterest(pendingSegment.id);
109 if (pendingSegment.timeoutEvent) {
110 m_scheduler.cancelEvent(pendingSegment.timeoutEvent);
113 m_face.getIoService().post([
self = std::move(m_this)] {});
117 SegmentFetcher::shouldStop(
const weak_ptr<SegmentFetcher>& weakSelf)
119 auto self = weakSelf.lock();
120 return self ==
nullptr ||
self->m_this ==
nullptr;
124 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
bool isRetransmission)
130 if (isRetransmission) {
134 weak_ptr<SegmentFetcher> weakSelf = m_this;
136 m_nSegmentsInFlight++;
137 auto pendingInterest = m_face.expressInterest(interest,
138 bind(&SegmentFetcher::afterSegmentReceivedCb,
139 this, _1, _2, weakSelf),
140 bind(&SegmentFetcher::afterNackReceivedCb,
141 this, _1, _2, weakSelf),
144 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
145 bind(&SegmentFetcher::afterTimeoutCb,
this, interest, weakSelf));
147 if (isRetransmission) {
148 updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
151 BOOST_ASSERT(m_pendingSegments.count(0) == 0);
153 pendingInterest, timeoutEvent});
158 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest)
160 weak_ptr<SegmentFetcher> weakSelf = m_this;
162 if (checkAllSegmentsReceived()) {
164 return finalizeFetch();
167 int64_t availableWindowSize =
static_cast<int64_t
>(m_cwnd) - m_nSegmentsInFlight;
168 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
170 while (availableWindowSize > 0) {
171 if (!m_retxQueue.empty()) {
172 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
174 if (pendingSegmentIt == m_pendingSegments.end()) {
178 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
179 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
181 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
182 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
187 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
192 availableWindowSize--;
195 for (
const auto& segment : segmentsToRequest) {
197 interest.
setName(
Name(m_versionedDataName).appendSegment(segment.first));
203 m_nSegmentsInFlight++;
204 auto pendingInterest = m_face.expressInterest(interest,
205 bind(&SegmentFetcher::afterSegmentReceivedCb,
206 this, _1, _2, weakSelf),
207 bind(&SegmentFetcher::afterNackReceivedCb,
208 this, _1, _2, weakSelf),
211 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
212 bind(&SegmentFetcher::afterTimeoutCb,
this, interest, weakSelf));
214 if (segment.second) {
215 updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
218 BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
219 m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
220 time::steady_clock::now(),
221 pendingInterest, timeoutEvent});
222 m_highInterest = segment.first;
228 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
const Data& data,
229 const weak_ptr<SegmentFetcher>& weakSelf)
231 if (shouldStop(weakSelf))
234 BOOST_ASSERT(m_nSegmentsInFlight > 0);
235 m_nSegmentsInFlight--;
238 if (!currentSegmentComponent.
isSegment()) {
242 uint64_t currentSegment = currentSegmentComponent.
toSegment();
245 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
246 if (m_receivedSegments.size() > 0) {
247 pendingSegmentIt = m_pendingSegments.find(currentSegment);
250 pendingSegmentIt = m_pendingSegments.begin();
253 if (pendingSegmentIt == m_pendingSegments.end()) {
260 pendingSegmentIt->second.timeoutEvent.cancel();
262 m_validator.validate(data,
263 bind(&SegmentFetcher::afterValidationSuccess,
this, _1, origInterest,
264 pendingSegmentIt, weakSelf),
265 bind(&SegmentFetcher::afterValidationFailure,
this, _1, _2, weakSelf));
269 SegmentFetcher::afterValidationSuccess(
const Data& data,
const Interest& origInterest,
270 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
271 const weak_ptr<SegmentFetcher>& weakSelf)
273 if (shouldStop(weakSelf))
285 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
286 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
287 std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
291 m_pendingSegments.erase(pendingSegmentIt);
294 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
295 std::forward_as_tuple(currentSegment),
298 receivedSegmentIt.first->second.begin());
305 "Received FinalBlockId did not contain a segment component");
308 if (data.
getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
310 cancelExcessInFlightSegments();
314 if (m_receivedSegments.size() == 1) {
316 if (currentSegment == 0) {
322 if (m_highData < currentSegment) {
323 m_highData = currentSegment;
333 fetchSegmentsInWindow(origInterest);
337 SegmentFetcher::afterValidationFailure(
const Data& data,
339 const weak_ptr<SegmentFetcher>& weakSelf)
341 if (shouldStop(weakSelf))
348 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
const lp::Nack& nack,
349 const weak_ptr<SegmentFetcher>& weakSelf)
351 if (shouldStop(weakSelf))
356 BOOST_ASSERT(m_nSegmentsInFlight > 0);
357 m_nSegmentsInFlight--;
362 afterNackOrTimeout(origInterest);
371 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
372 const weak_ptr<SegmentFetcher>& weakSelf)
374 if (shouldStop(weakSelf))
379 BOOST_ASSERT(m_nSegmentsInFlight > 0);
380 m_nSegmentsInFlight--;
381 afterNackOrTimeout(origInterest);
385 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest)
393 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
394 BOOST_ASSERT(m_pendingSegments.size() > 0);
396 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
397 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
400 BOOST_ASSERT(m_pendingSegments.size() > 0);
401 pendingSegmentIt = m_pendingSegments.begin();
405 pendingSegmentIt->second.timeoutEvent.cancel();
406 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
408 m_rttEstimator.backoffRto();
410 if (m_receivedSegments.size() == 0) {
412 fetchFirstSegment(origInterest,
true);
416 m_retxQueue.push(pendingSegmentIt->first);
417 fetchSegmentsInWindow(origInterest);
422 SegmentFetcher::finalizeFetch()
427 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
429 for (int64_t i = 0; i < m_nSegments; i++) {
430 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
438 SegmentFetcher::windowIncrease()
440 if (m_options.useConstantCwnd) {
441 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
445 if (m_cwnd < m_ssthresh) {
446 m_cwnd += m_options.aiStep;
449 m_cwnd += m_options.aiStep / std::floor(m_cwnd);
454 SegmentFetcher::windowDecrease()
456 if (m_options.disableCwa || m_highData > m_recPoint) {
457 m_recPoint = m_highInterest;
459 if (m_options.useConstantCwnd) {
460 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
465 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef);
466 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
471 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
478 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
479 const PendingInterestId* pendingInterest,
482 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
483 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
484 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
485 pendingSegmentIt->second.state = SegmentState::Retransmitted;
486 m_face.removePendingInterest(pendingSegmentIt->second.id);
487 pendingSegmentIt->second.id = pendingInterest;
488 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
492 SegmentFetcher::cancelExcessInFlightSegments()
494 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
495 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
496 m_face.removePendingInterest(it->second.id);
497 if (it->second.timeoutEvent) {
498 m_scheduler.cancelEvent(it->second.timeoutEvent);
500 it = m_pendingSegments.erase(it);
501 BOOST_ASSERT(m_nSegmentsInFlight > 0);
502 m_nSegmentsInFlight--;
511 SegmentFetcher::checkAllSegmentsReceived()
513 bool haveReceivedAllSegments =
false;
515 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
516 haveReceivedAllSegments =
true;
518 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
519 if (m_receivedSegments.count(i) == 0) {
521 haveReceivedAllSegments =
false;
526 return haveReceivedAllSegments;
530 SegmentFetcher::getEstimatedRto()
534 return std::min(m_options.maxTimeout,
535 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
const Name & getName() const
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
static time_point now() noexcept
An unrecoverable Nack was received during retrieval.
void refreshNonce()
Change nonce value.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
Utility class to fetch the latest version of a segmented object.
Represents an Interest packet.
const optional< name::Component > & getFinalBlock() const
void stop()
Stops fetching.
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
represents a Network Nack
NackReason getReason() const
double aiStep
additive increase step (in segments)
bool isSegment() const
Check if the component is segment number per NDN naming conventions.
One of the retrieved segments failed user-provided validation.
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Interest & setName(const Name &name)
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
Retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
One of the retrieved Data packets lacked a segment number in the last Name component (excl...
double initSsthresh
initial slow start threshold
size_t value_size() const
Get size of TLV-VALUE aka TLV-LENGTH.
double initCwnd
initial congestion window size
const Name & getName() const
Get name.
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Block & getContent() const
Get Content.
PartialName getPrefix(ssize_t nComponents) const
Extract a prefix of the name.
Interest & setInterestLifetime(time::milliseconds lifetime)
Set Interest's lifetime.
implements an output stream that constructs ndn::Buffer
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
A handle of scheduled event.
Represents a Data packet.
A received FinalBlockId did not contain a segment component.
const Component & get(ssize_t i) const
Get the component at the given index.
Interface for validating data and interest packets.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.