29 #include "../name-component.hpp" 30 #include "../encoding/buffer-stream.hpp" 31 #include "../lp/nack.hpp" 32 #include "../lp/nack-header.hpp" 34 #include <boost/lexical_cast.hpp> 40 constexpr
double SegmentFetcher::MIN_SSTHRESH;
46 BOOST_THROW_EXCEPTION(std::invalid_argument(
"maxTimeout must be greater than or equal to 1 millisecond"));
50 BOOST_THROW_EXCEPTION(std::invalid_argument(
"initCwnd must be greater than or equal to 1"));
54 BOOST_THROW_EXCEPTION(std::invalid_argument(
"aiStep must be greater than or equal to 0"));
57 if (mdCoef < 0.0 || mdCoef > 1.0) {
58 BOOST_THROW_EXCEPTION(std::invalid_argument(
"mdCoef must be in range [0, 1]"));
62 SegmentFetcher::SegmentFetcher(
Face& face,
67 , m_scheduler(m_face.getIoService())
68 , m_validator(validator)
74 , m_nSegmentsInFlight(0)
85 shared_ptr<SegmentFetcher>
91 shared_ptr<SegmentFetcher> fetcher(
new SegmentFetcher(face, validator, options));
92 fetcher->fetchFirstSegment(baseInterest,
false, fetcher);
96 shared_ptr<SegmentFetcher>
108 shared_ptr<SegmentFetcher> fetcher =
start(face, baseInterest, validator, options);
109 fetcher->onComplete.connect(completeCallback);
110 fetcher->onError.connect(errorCallback);
114 shared_ptr<SegmentFetcher>
117 shared_ptr<security::v2::Validator> validator,
121 auto fetcher =
fetch(face, baseInterest, *validator, completeCallback, errorCallback);
128 SegmentFetcher::fetchFirstSegment(
const Interest& baseInterest,
129 bool isRetransmission,
130 shared_ptr<SegmentFetcher>
self)
136 if (isRetransmission) {
140 m_nSegmentsInFlight++;
141 auto pendingInterest = m_face.expressInterest(interest,
142 bind(&SegmentFetcher::afterSegmentReceivedCb,
144 bind(&SegmentFetcher::afterNackReceivedCb,
148 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
149 bind(&SegmentFetcher::afterTimeoutCb,
this, interest,
self));
150 if (isRetransmission) {
151 updateRetransmittedSegment(0, pendingInterest, timeoutEvent);
154 BOOST_ASSERT(m_pendingSegments.count(0) == 0);
156 pendingInterest, timeoutEvent});
161 SegmentFetcher::fetchSegmentsInWindow(
const Interest& origInterest, shared_ptr<SegmentFetcher>
self)
163 if (checkAllSegmentsReceived()) {
168 int64_t availableWindowSize =
static_cast<int64_t
>(m_cwnd) - m_nSegmentsInFlight;
170 std::vector<std::pair<uint64_t, bool>> segmentsToRequest;
172 while (availableWindowSize > 0) {
173 if (!m_retxQueue.empty()) {
174 auto pendingSegmentIt = m_pendingSegments.find(m_retxQueue.front());
176 if (pendingSegmentIt == m_pendingSegments.end()) {
180 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
181 segmentsToRequest.emplace_back(pendingSegmentIt->first,
true);
183 else if (m_nSegments == 0 || m_nextSegmentNum < static_cast<uint64_t>(m_nSegments)) {
184 if (m_receivedSegments.count(m_nextSegmentNum) > 0) {
189 segmentsToRequest.emplace_back(m_nextSegmentNum++,
false);
194 availableWindowSize--;
197 for (
const auto& segment : segmentsToRequest) {
203 Name interestName(m_versionedDataName);
205 interest.
setName(interestName);
207 m_nSegmentsInFlight++;
208 auto pendingInterest = m_face.expressInterest(interest,
209 bind(&SegmentFetcher::afterSegmentReceivedCb,
211 bind(&SegmentFetcher::afterNackReceivedCb,
215 m_scheduler.scheduleEvent(m_options.useConstantInterestTimeout ? m_options.maxTimeout : getEstimatedRto(),
216 bind(&SegmentFetcher::afterTimeoutCb,
this, interest,
self));
217 if (segment.second) {
218 updateRetransmittedSegment(segment.first, pendingInterest, timeoutEvent);
221 BOOST_ASSERT(m_pendingSegments.count(segment.first) == 0);
222 m_pendingSegments.emplace(segment.first, PendingSegment{SegmentState::FirstInterest,
223 time::steady_clock::now(),
224 pendingInterest, timeoutEvent});
225 m_highInterest = segment.first;
231 SegmentFetcher::afterSegmentReceivedCb(
const Interest& origInterest,
233 shared_ptr<SegmentFetcher>
self)
236 BOOST_ASSERT(m_nSegmentsInFlight > 0);
237 m_nSegmentsInFlight--;
240 if (!currentSegmentComponent.
isSegment()) {
244 uint64_t currentSegment = currentSegmentComponent.
toSegment();
247 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
248 if (m_receivedSegments.size() > 0) {
249 pendingSegmentIt = m_pendingSegments.find(currentSegment);
252 pendingSegmentIt = m_pendingSegments.begin();
256 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
257 pendingSegmentIt->second.timeoutEvent =
nullptr;
259 m_validator.validate(data,
260 bind(&SegmentFetcher::afterValidationSuccess,
this, _1, origInterest,
261 pendingSegmentIt,
self),
262 bind(&SegmentFetcher::afterValidationFailure,
this, _1, _2,
self));
266 SegmentFetcher::afterValidationSuccess(
const Data& data,
268 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt,
269 shared_ptr<SegmentFetcher>
self)
280 if (pendingSegmentIt->second.state == SegmentState::FirstInterest) {
281 m_rttEstimator.addMeasurement(m_timeLastSegmentReceived - pendingSegmentIt->second.sendTime,
282 std::max<int64_t>(m_nSegmentsInFlight + 1, 1));
286 m_pendingSegments.erase(pendingSegmentIt);
289 auto receivedSegmentIt = m_receivedSegments.emplace(std::piecewise_construct,
290 std::forward_as_tuple(currentSegment),
293 receivedSegmentIt.first->second.begin());
300 "Received FinalBlockId did not contain a segment component");
303 if (data.
getFinalBlock()->toSegment() + 1 !=
static_cast<uint64_t
>(m_nSegments)) {
305 cancelExcessInFlightSegments();
309 if (m_receivedSegments.size() == 1) {
311 if (currentSegment == 0) {
317 if (m_highData < currentSegment) {
318 m_highData = currentSegment;
328 fetchSegmentsInWindow(origInterest,
self);
332 SegmentFetcher::afterValidationFailure(
const Data& data,
334 shared_ptr<SegmentFetcher>
self)
337 boost::lexical_cast<std::string>(error));
342 SegmentFetcher::afterNackReceivedCb(
const Interest& origInterest,
344 shared_ptr<SegmentFetcher>
self)
347 BOOST_ASSERT(m_nSegmentsInFlight > 0);
348 m_nSegmentsInFlight--;
353 afterNackOrTimeout(origInterest,
self);
362 SegmentFetcher::afterTimeoutCb(
const Interest& origInterest,
363 shared_ptr<SegmentFetcher>
self)
366 BOOST_ASSERT(m_nSegmentsInFlight > 0);
367 m_nSegmentsInFlight--;
368 afterNackOrTimeout(origInterest,
self);
372 SegmentFetcher::afterNackOrTimeout(
const Interest& origInterest, shared_ptr<SegmentFetcher>
self)
380 std::map<uint64_t, PendingSegment>::iterator pendingSegmentIt;
381 BOOST_ASSERT(m_pendingSegments.size() > 0);
383 BOOST_ASSERT(m_pendingSegments.count(lastNameComponent.
toSegment()) > 0);
384 pendingSegmentIt = m_pendingSegments.find(lastNameComponent.
toSegment());
387 BOOST_ASSERT(m_pendingSegments.size() > 0);
388 pendingSegmentIt = m_pendingSegments.begin();
392 m_scheduler.cancelEvent(pendingSegmentIt->second.timeoutEvent);
393 pendingSegmentIt->second.timeoutEvent =
nullptr;
394 pendingSegmentIt->second.state = SegmentState::InRetxQueue;
396 m_rttEstimator.backoffRto();
398 if (m_receivedSegments.size() == 0) {
400 fetchFirstSegment(origInterest,
true,
self);
404 m_retxQueue.push(pendingSegmentIt->first);
405 fetchSegmentsInWindow(origInterest,
self);
410 SegmentFetcher::finalizeFetch(shared_ptr<SegmentFetcher>
self)
415 BOOST_ASSERT(m_receivedSegments.size() >=
static_cast<uint64_t
>(m_nSegments));
417 for (int64_t i = 0; i < m_nSegments; i++) {
418 buf.write(m_receivedSegments[i].get<const char>(), m_receivedSegments[i].size());
425 SegmentFetcher::windowIncrease()
427 if (m_options.useConstantCwnd) {
428 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
432 if (m_cwnd < m_ssthresh) {
433 m_cwnd += m_options.aiStep;
436 m_cwnd += m_options.aiStep / std::floor(m_cwnd);
441 SegmentFetcher::windowDecrease()
443 if (m_options.disableCwa || m_highData > m_recPoint) {
444 m_recPoint = m_highInterest;
446 if (m_options.useConstantCwnd) {
447 BOOST_ASSERT(m_cwnd == m_options.initCwnd);
452 m_ssthresh = std::max(MIN_SSTHRESH, m_cwnd * m_options.mdCoef);
453 m_cwnd = m_options.resetCwndToInit ? m_options.initCwnd : m_ssthresh;
458 SegmentFetcher::signalError(uint32_t code,
const std::string& msg)
461 for (
const auto& pendingSegment : m_pendingSegments) {
462 m_face.removePendingInterest(pendingSegment.second.id);
463 if (pendingSegment.second.timeoutEvent) {
464 m_scheduler.cancelEvent(pendingSegment.second.timeoutEvent);
471 SegmentFetcher::updateRetransmittedSegment(uint64_t segmentNum,
472 const PendingInterestId* pendingInterest,
475 auto pendingSegmentIt = m_pendingSegments.find(segmentNum);
476 BOOST_ASSERT(pendingSegmentIt != m_pendingSegments.end());
477 BOOST_ASSERT(pendingSegmentIt->second.state == SegmentState::InRetxQueue);
478 pendingSegmentIt->second.state = SegmentState::Retransmitted;
479 pendingSegmentIt->second.id = pendingInterest;
480 pendingSegmentIt->second.timeoutEvent = timeoutEvent;
484 SegmentFetcher::cancelExcessInFlightSegments()
486 for (
auto it = m_pendingSegments.begin(); it != m_pendingSegments.end();) {
487 if (it->first >= static_cast<uint64_t>(m_nSegments)) {
488 m_face.removePendingInterest(it->second.id);
489 if (it->second.timeoutEvent) {
490 m_scheduler.cancelEvent(it->second.timeoutEvent);
492 it = m_pendingSegments.erase(it);
493 BOOST_ASSERT(m_nSegmentsInFlight > 0);
494 m_nSegmentsInFlight--;
503 SegmentFetcher::checkAllSegmentsReceived()
505 bool haveReceivedAllSegments =
false;
507 if (m_nSegments != 0 && m_nReceived >= m_nSegments) {
508 haveReceivedAllSegments =
true;
510 for (uint64_t i = 0; i < static_cast<uint64_t>(m_nSegments); i++) {
511 if (m_receivedSegments.count(i) == 0) {
513 haveReceivedAllSegments =
false;
518 return haveReceivedAllSegments;
522 SegmentFetcher::getEstimatedRto()
526 return std::min(m_options.maxTimeout,
527 time::duration_cast<time::milliseconds>(m_rttEstimator.getEstimatedRto()));
const Name & getName() const
Copyright (c) 2013-2017 Regents of the University of California.
uint64_t getCongestionMark() const
get the value of the CongestionMark tag
time::milliseconds interestLifetime
lifetime of sent Interests - independent of Interest timeout
Interest & setMustBeFresh(bool mustBeFresh)
Add or remove MustBeFresh element.
function< void(uint32_t code, const std::string &msg)> ErrorCallback
static time_point now() noexcept
an unrecoverable Nack was received during retrieval
void refreshNonce()
Change nonce value.
static shared_ptr< SegmentFetcher > fetch(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const CompleteCallback &completeCallback, const ErrorCallback &errorCallback)
Initiates segment fetching.
Signal< SegmentFetcher, ConstBufferPtr > onComplete
Emits upon successful retrieval of the complete data.
time::milliseconds maxTimeout
maximum allowed time between successful receipt of segments
bool useConstantInterestTimeout
if true, Interest timeout is kept at maxTimeout
Utility class to fetch the latest version of a segmented object.
Represents an Interest packet.
const optional< name::Component > & getFinalBlock() const
Buffer::const_iterator value_begin() const
Get begin iterator of TLV-VALUE.
Buffer::const_iterator value_end() const
Get end iterator of TLV-VALUE.
represents a Network Nack
NackReason getReason() const
double aiStep
additive increase step (in segments)
bool isSegment() const
Check if the component is segment number per NDN naming conventions.
one of the retrieved segments failed user-provided validation
uint64_t toSegment() const
Interpret as segment number component using NDN naming conventions.
Interest & setName(const Name &name)
static shared_ptr< SegmentFetcher > start(Face &face, const Interest &baseInterest, security::v2::Validator &validator, const Options &options=Options())
Initiates segment fetching.
Signal< SegmentFetcher > afterSegmentNacked
Emits whenever an Interest for a data segment is nacked.
Provide a communication channel with local or remote NDN forwarder.
Signal< SegmentFetcher, Data > afterSegmentValidated
Emits whenever a received data segment has been successfully validated.
retrieval timed out because the maximum timeout between the successful receipt of segments was exceed...
function< void(ConstBufferPtr data)> CompleteCallback
one of the retrieved Data packets lacked a segment number in the last Name component (excl...
Represents an absolute name.
double initSsthresh
initial slow start threshold
size_t value_size() const
Get size of TLV-VALUE aka TLV-LENGTH.
double initCwnd
initial congestion window size
const Name & getName() const
Get name.
Represents a name component.
shared_ptr< Buffer > buf()
Flush written data to the stream and return shared pointer to the underlying buffer.
Signal< SegmentFetcher, Data > afterSegmentReceived
Emits whenever a data segment received.
Signal< SegmentFetcher > afterSegmentTimedOut
Emits whenever an Interest for a data segment times out.
RttEstimator::Options rttOptions
options for RTT estimator
Validation error code and optional detailed error message.
const Block & getContent() const
Get Content.
PartialName getPrefix(ssize_t nComponents) const
Extract a prefix of the name.
Interest & setInterestLifetime(time::milliseconds lifetime)
Set Interest's lifetime.
implements an output stream that constructs ndn::Buffer
time::milliseconds getInterestLifetime() const
Signal< SegmentFetcher, uint32_t, std::string > onError
Emits when the retrieval could not be completed due to an error.
Identifies a scheduled event.
Represents a Data packet.
a received FinalBlockId did not contain a segment component
bool useConstantCwnd
if true, window size is kept at initCwnd
const Component & get(ssize_t i) const
Get the component at the given index.
Name & appendSegment(uint64_t segmentNo)
Append a segment number (sequential) component.
Interface for validating data and interest packets.
Interest & setCanBePrefix(bool canBePrefix)
Add or remove CanBePrefix element.
shared_ptr< const Buffer > ConstBufferPtr