notification-subscriber.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
49 #include "random.hpp"
50 
51 namespace ndn {
52 namespace util {
53 
55  time::milliseconds interestLifetime)
56  : m_face(face)
57  , m_prefix(prefix)
58  , m_isRunning(false)
59  , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
60  , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
61  , m_attempts(1)
62  , m_scheduler(face.getIoService())
63  , m_nackEvent(m_scheduler)
64  , m_interestLifetime(interestLifetime)
65 {
66 }
67 
69 
70 void
72 {
73  if (m_isRunning) // already running
74  return;
75  m_isRunning = true;
76 
77  this->sendInitialInterest();
78 }
79 
80 void
82 {
83  if (!m_isRunning) // not running
84  return;
85  m_isRunning = false;
86 
87  if (m_lastInterestId != 0)
88  m_face.removePendingInterest(m_lastInterestId);
89  m_lastInterestId = 0;
90 }
91 
92 void
93 NotificationSubscriberBase::sendInitialInterest()
94 {
95  if (this->shouldStop())
96  return;
97 
98  auto interest = make_shared<Interest>(m_prefix);
99  interest->setMustBeFresh(true);
100  interest->setChildSelector(1);
101  interest->setInterestLifetime(getInterestLifetime());
102 
103  m_lastInterestId = m_face.expressInterest(*interest,
104  bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
105  bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
106  bind(&NotificationSubscriberBase::afterTimeout, this));
107 }
108 
109 void
110 NotificationSubscriberBase::sendNextInterest()
111 {
112  if (this->shouldStop())
113  return;
114 
115  BOOST_ASSERT(m_lastSequenceNo != std::numeric_limits<uint64_t>::max()); // overflow or missing initial reply
116 
117  Name nextName = m_prefix;
118  nextName.appendSequenceNumber(m_lastSequenceNo + 1);
119 
120  auto interest = make_shared<Interest>(nextName);
121  interest->setInterestLifetime(getInterestLifetime());
122 
123  m_lastInterestId = m_face.expressInterest(*interest,
124  bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
125  bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
126  bind(&NotificationSubscriberBase::afterTimeout, this));
127 }
128 
129 bool
130 NotificationSubscriberBase::shouldStop()
131 {
132  if (!m_isRunning)
133  return true;
134  if (!this->hasSubscriber() && onNack.isEmpty()) {
135  this->stop();
136  return true;
137  }
138  return false;
139 }
140 
141 void
142 NotificationSubscriberBase::afterReceiveData(const Data& data)
143 {
144  if (this->shouldStop())
145  return;
146 
147  try {
148  m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
149  }
150  catch (const tlv::Error&) {
151  this->onDecodeError(data);
152  this->sendInitialInterest();
153  return;
154  }
155 
156  if (!this->decodeAndDeliver(data)) {
157  this->onDecodeError(data);
158  this->sendInitialInterest();
159  return;
160  }
161 
162  this->sendNextInterest();
163 }
164 
165 void
166 NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
167 {
168  if (this->shouldStop())
169  return;
170 
171  this->onNack(nack);
172 
173  time::milliseconds delay = exponentialBackoff(nack);
174  m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
175 }
176 
177 void
178 NotificationSubscriberBase::afterTimeout()
179 {
180  if (this->shouldStop())
181  return;
182 
183  this->onTimeout();
184 
185  this->sendInitialInterest();
186 }
187 
188 time::milliseconds
189 NotificationSubscriberBase::exponentialBackoff(lp::Nack nack)
190 {
191  uint64_t nackSequenceNo;
192 
193  try {
194  nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
195  }
196  catch (const tlv::Error&) {
197  nackSequenceNo = 0;
198  }
199 
200  if (m_lastNackSequenceNo == nackSequenceNo) {
201  ++m_attempts;
202  }
203  else {
204  m_attempts = 1;
205  }
206 
207  m_lastNackSequenceNo = nackSequenceNo;
208 
209  return time::milliseconds(static_cast<uint32_t>(pow(2, m_attempts) * 100 +
210  random::generateWord32() % 100));
211 }
212 
213 } // namespace util
214 } // namespace ndn
void start()
start or resume receiving notifications
Copyright (c) 2013-2016 Regents of the University of California.
Definition: common.hpp:74
time::milliseconds getInterestLifetime() const
STL namespace.
Name & appendSequenceNumber(uint64_t seqNo)
Append sequence number using NDN naming conventions.
Definition: name.cpp:241
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
Definition: random.cpp:63
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
fires when a NACK is received
signal::Signal< NotificationSubscriberBase > onTimeout
fires when no Notification is received within .getInterestLifetime period
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:121
Name abstraction to represent an absolute name.
Definition: name.hpp:46
EventId scheduleEvent(const time::nanoseconds &after, const EventCallback &callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:84
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification ...
const PendingInterestId * expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Definition: face.cpp:166
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
Definition: face.cpp:216
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
construct a NotificationSubscriber
void stop()
stop receiving notifications