notification-subscriber.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2018 Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
12  *
13  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
14  * terms of the GNU Lesser General Public License as published by the Free Software
15  * Foundation, either version 3 of the License, or (at your option) any later version.
16  *
17  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
18  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
19  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
20  *
21  * You should have received copies of the GNU General Public License and GNU Lesser
22  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
23  * <http://www.gnu.org/licenses/>.
24  *
25  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
26  */
27 
29 #include "random.hpp"
30 
31 #include <cmath>
32 
33 namespace ndn {
34 namespace util {
35 
37  time::milliseconds interestLifetime)
38  : m_face(face)
39  , m_prefix(prefix)
40  , m_isRunning(false)
41  , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
42  , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
43  , m_attempts(1)
44  , m_scheduler(face.getIoService())
45  , m_nackEvent(m_scheduler)
46  , m_interestLifetime(interestLifetime)
47 {
48 }
49 
51 
52 void
54 {
55  if (m_isRunning) // already running
56  return;
57  m_isRunning = true;
58 
59  this->sendInitialInterest();
60 }
61 
62 void
64 {
65  if (!m_isRunning) // not running
66  return;
67  m_isRunning = false;
68 
69  if (m_lastInterestId != 0)
70  m_face.removePendingInterest(m_lastInterestId);
71  m_lastInterestId = 0;
72 }
73 
74 void
75 NotificationSubscriberBase::sendInitialInterest()
76 {
77  if (this->shouldStop())
78  return;
79 
80  auto interest = make_shared<Interest>(m_prefix);
81  interest->setMustBeFresh(true);
82  interest->setChildSelector(1);
83  interest->setInterestLifetime(getInterestLifetime());
84 
85  m_lastInterestId = m_face.expressInterest(*interest,
86  bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
87  bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
88  bind(&NotificationSubscriberBase::afterTimeout, this));
89 }
90 
91 void
92 NotificationSubscriberBase::sendNextInterest()
93 {
94  if (this->shouldStop())
95  return;
96 
97  BOOST_ASSERT(m_lastSequenceNo != std::numeric_limits<uint64_t>::max()); // overflow or missing initial reply
98 
99  Name nextName = m_prefix;
100  nextName.appendSequenceNumber(m_lastSequenceNo + 1);
101 
102  auto interest = make_shared<Interest>(nextName);
103  interest->setInterestLifetime(getInterestLifetime());
104 
105  m_lastInterestId = m_face.expressInterest(*interest,
106  bind(&NotificationSubscriberBase::afterReceiveData, this, _2),
107  bind(&NotificationSubscriberBase::afterReceiveNack, this, _2),
108  bind(&NotificationSubscriberBase::afterTimeout, this));
109 }
110 
111 bool
112 NotificationSubscriberBase::shouldStop()
113 {
114  if (!m_isRunning)
115  return true;
116  if (!this->hasSubscriber() && onNack.isEmpty()) {
117  this->stop();
118  return true;
119  }
120  return false;
121 }
122 
123 void
124 NotificationSubscriberBase::afterReceiveData(const Data& data)
125 {
126  if (this->shouldStop())
127  return;
128 
129  try {
130  m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
131  }
132  catch (const tlv::Error&) {
133  this->onDecodeError(data);
134  this->sendInitialInterest();
135  return;
136  }
137 
138  if (!this->decodeAndDeliver(data)) {
139  this->onDecodeError(data);
140  this->sendInitialInterest();
141  return;
142  }
143 
144  this->sendNextInterest();
145 }
146 
147 void
148 NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
149 {
150  if (this->shouldStop())
151  return;
152 
153  this->onNack(nack);
154 
155  time::milliseconds delay = exponentialBackoff(nack);
156  m_nackEvent = m_scheduler.scheduleEvent(delay, [this] {this->sendInitialInterest();});
157 }
158 
159 void
160 NotificationSubscriberBase::afterTimeout()
161 {
162  if (this->shouldStop())
163  return;
164 
165  this->onTimeout();
166 
167  this->sendInitialInterest();
168 }
169 
170 time::milliseconds
171 NotificationSubscriberBase::exponentialBackoff(lp::Nack nack)
172 {
173  uint64_t nackSequenceNo;
174 
175  try {
176  nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
177  }
178  catch (const tlv::Error&) {
179  nackSequenceNo = 0;
180  }
181 
182  if (m_lastNackSequenceNo == nackSequenceNo) {
183  ++m_attempts;
184  }
185  else {
186  m_attempts = 1;
187  }
188 
189  m_lastNackSequenceNo = nackSequenceNo;
190 
191  return time::milliseconds(static_cast<time::milliseconds::rep>(std::pow(2, m_attempts) * 100 +
192  random::generateWord32() % 100));
193 }
194 
195 } // namespace util
196 } // namespace ndn
void start()
start or resume receiving notifications
const Name & getName() const
Definition: interest.hpp:139
Copyright (c) 2013-2017 Regents of the University of California.
Definition: common.hpp:66
time::milliseconds getInterestLifetime() const
const Interest & getInterest() const
Definition: nack.hpp:53
STL namespace.
EventId scheduleEvent(time::nanoseconds after, const EventCallback &callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:87
represents a Network Nack
Definition: nack.hpp:40
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
Definition: random.cpp:63
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
fires when a NACK is received
signal::Signal< NotificationSubscriberBase > onTimeout
fires when no Notification is received within .getInterestLifetime period
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:90
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Definition: name.hpp:406
Represents an absolute name.
Definition: name.hpp:42
const Name & getName() const
Get name.
Definition: data.hpp:121
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification ...
uint64_t toSequenceNumber() const
Interpret as sequence number component using NDN naming conventions.
const PendingInterestId * expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Definition: face.cpp:178
Represents a Data packet.
Definition: data.hpp:35
const Component & get(ssize_t i) const
Get the component at the given index.
Definition: name.hpp:164
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
Definition: face.cpp:194
represents an error in TLV encoding or decoding
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
construct a NotificationSubscriber
void stop()
stop receiving notifications