55 time::milliseconds interestLifetime)
59 , m_lastSequenceNo(
std::numeric_limits<uint64_t>::max())
60 , m_lastNackSequenceNo(
std::numeric_limits<uint64_t>::max())
62 , m_scheduler(face.getIoService())
63 , m_nackEvent(m_scheduler)
64 , m_interestLifetime(interestLifetime)
77 this->sendInitialInterest();
87 if (m_lastInterestId != 0)
93 NotificationSubscriberBase::sendInitialInterest()
95 if (this->shouldStop())
98 auto interest = make_shared<Interest>(m_prefix);
99 interest->setMustBeFresh(
true);
100 interest->setChildSelector(1);
104 bind(&NotificationSubscriberBase::afterReceiveData,
this, _2),
105 bind(&NotificationSubscriberBase::afterReceiveNack,
this, _2),
106 bind(&NotificationSubscriberBase::afterTimeout,
this));
110 NotificationSubscriberBase::sendNextInterest()
112 if (this->shouldStop())
115 BOOST_ASSERT(m_lastSequenceNo != std::numeric_limits<uint64_t>::max());
117 Name nextName = m_prefix;
120 auto interest = make_shared<Interest>(nextName);
124 bind(&NotificationSubscriberBase::afterReceiveData,
this, _2),
125 bind(&NotificationSubscriberBase::afterReceiveNack,
this, _2),
126 bind(&NotificationSubscriberBase::afterTimeout,
this));
130 NotificationSubscriberBase::shouldStop()
134 if (!this->hasSubscriber() &&
onNack.isEmpty()) {
142 NotificationSubscriberBase::afterReceiveData(
const Data& data)
144 if (this->shouldStop())
148 m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
150 catch (
const tlv::Error&) {
152 this->sendInitialInterest();
156 if (!this->decodeAndDeliver(data)) {
158 this->sendInitialInterest();
162 this->sendNextInterest();
166 NotificationSubscriberBase::afterReceiveNack(
const lp::Nack& nack)
168 if (this->shouldStop())
173 time::milliseconds delay = exponentialBackoff(nack);
174 m_nackEvent = m_scheduler.
scheduleEvent(delay, [
this] {this->sendInitialInterest();});
178 NotificationSubscriberBase::afterTimeout()
180 if (this->shouldStop())
185 this->sendInitialInterest();
189 NotificationSubscriberBase::exponentialBackoff(
lp::Nack nack)
191 uint64_t nackSequenceNo;
194 nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
196 catch (
const tlv::Error&) {
200 if (m_lastNackSequenceNo == nackSequenceNo) {
207 m_lastNackSequenceNo = nackSequenceNo;
209 return time::milliseconds(static_cast<uint32_t>(pow(2, m_attempts) * 100 +
void start()
start or resume receiving notifications
Copyright (c) 2013-2016 Regents of the University of California.
virtual ~NotificationSubscriberBase()
time::milliseconds getInterestLifetime() const
Name & appendSequenceNumber(uint64_t seqNo)
Append sequence number using NDN naming conventions.
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
fires when a NACK is received
signal::Signal< NotificationSubscriberBase > onTimeout
fires when no Notification is received within .getInterestLifetime period
Provide a communication channel with local or remote NDN forwarder.
Name abstraction to represent an absolute name.
EventId scheduleEvent(const time::nanoseconds &after, const EventCallback &callback)
Schedule a one-time event after the specified delay.
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification ...
const PendingInterestId * expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
construct a NotificationSubscriber
void stop()
stop receiving notifications