notification-subscriber.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2018 Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
12  *
13  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
14  * terms of the GNU Lesser General Public License as published by the Free Software
15  * Foundation, either version 3 of the License, or (at your option) any later version.
16  *
17  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
18  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
19  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
20  *
21  * You should have received copies of the GNU General Public License and GNU Lesser
22  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
23  * <http://www.gnu.org/licenses/>.
24  *
25  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
26  */
27 
29 #include "random.hpp"
30 
31 #include <cmath>
32 
33 namespace ndn {
34 namespace util {
35 
37  time::milliseconds interestLifetime)
38  : m_face(face)
39  , m_prefix(prefix)
40  , m_isRunning(false)
41  , m_lastSequenceNo(std::numeric_limits<uint64_t>::max())
42  , m_lastNackSequenceNo(std::numeric_limits<uint64_t>::max())
43  , m_attempts(1)
44  , m_scheduler(face.getIoService())
45  , m_nackEvent(m_scheduler)
46  , m_lastInterestId(nullptr)
47  , m_interestLifetime(interestLifetime)
48 {
49 }
50 
52 
53 void
55 {
56  if (m_isRunning) // already running
57  return;
58  m_isRunning = true;
59 
60  sendInitialInterest();
61 }
62 
63 void
65 {
66  if (!m_isRunning) // not running
67  return;
68  m_isRunning = false;
69 
70  if (m_lastInterestId != nullptr)
71  m_face.removePendingInterest(m_lastInterestId);
72  m_lastInterestId = nullptr;
73 }
74 
75 void
76 NotificationSubscriberBase::sendInitialInterest()
77 {
78  if (shouldStop())
79  return;
80 
81  auto interest = make_shared<Interest>(m_prefix);
82  interest->setCanBePrefix(true);
83  interest->setMustBeFresh(true);
84  interest->setChildSelector(1);
85  interest->setInterestLifetime(getInterestLifetime());
86 
87  m_lastInterestId = m_face.expressInterest(*interest,
88  [this] (const auto&, const auto& d) { this->afterReceiveData(d); },
89  [this] (const auto&, const auto& n) { this->afterReceiveNack(n); },
90  [this] (const auto&) { this->afterTimeout(); });
91 }
92 
93 void
94 NotificationSubscriberBase::sendNextInterest()
95 {
96  if (shouldStop())
97  return;
98 
99  BOOST_ASSERT(m_lastSequenceNo != std::numeric_limits<uint64_t>::max()); // overflow or missing initial reply
100 
101  Name nextName = m_prefix;
102  nextName.appendSequenceNumber(m_lastSequenceNo + 1);
103 
104  auto interest = make_shared<Interest>(nextName);
105  interest->setCanBePrefix(false);
106  interest->setInterestLifetime(getInterestLifetime());
107 
108  m_lastInterestId = m_face.expressInterest(*interest,
109  [this] (const auto&, const auto& d) { this->afterReceiveData(d); },
110  [this] (const auto&, const auto& n) { this->afterReceiveNack(n); },
111  [this] (const auto&) { this->afterTimeout(); });
112 }
113 
114 bool
115 NotificationSubscriberBase::shouldStop()
116 {
117  if (!m_isRunning)
118  return true;
119 
120  if (!hasSubscriber() && onNack.isEmpty()) {
121  stop();
122  return true;
123  }
124  return false;
125 }
126 
127 void
128 NotificationSubscriberBase::afterReceiveData(const Data& data)
129 {
130  if (shouldStop())
131  return;
132 
133  try {
134  m_lastSequenceNo = data.getName().get(-1).toSequenceNumber();
135  }
136  catch (const tlv::Error&) {
137  onDecodeError(data);
138  sendInitialInterest();
139  return;
140  }
141 
142  if (!decodeAndDeliver(data)) {
143  onDecodeError(data);
144  sendInitialInterest();
145  return;
146  }
147 
148  sendNextInterest();
149 }
150 
151 void
152 NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
153 {
154  if (shouldStop())
155  return;
156 
157  onNack(nack);
158 
159  time::milliseconds delay = exponentialBackoff(nack);
160  m_nackEvent = m_scheduler.scheduleEvent(delay, [this] { sendInitialInterest(); });
161 }
162 
163 void
164 NotificationSubscriberBase::afterTimeout()
165 {
166  if (shouldStop())
167  return;
168 
169  onTimeout();
170 
171  sendInitialInterest();
172 }
173 
174 time::milliseconds
175 NotificationSubscriberBase::exponentialBackoff(lp::Nack nack)
176 {
177  uint64_t nackSequenceNo;
178  try {
179  nackSequenceNo = nack.getInterest().getName().get(-1).toSequenceNumber();
180  }
181  catch (const tlv::Error&) {
182  nackSequenceNo = 0;
183  }
184 
185  if (m_lastNackSequenceNo == nackSequenceNo) {
186  ++m_attempts;
187  }
188  else {
189  m_attempts = 1;
190  }
191 
192  m_lastNackSequenceNo = nackSequenceNo;
193 
194  return time::milliseconds(static_cast<time::milliseconds::rep>(std::pow(2, m_attempts) * 100 +
195  random::generateWord32() % 100));
196 }
197 
198 } // namespace util
199 } // namespace ndn
void start()
start or resume receiving notifications
const Name & getName() const
Definition: interest.hpp:133
Copyright (c) 2013-2017 Regents of the University of California.
Definition: common.hpp:65
time::milliseconds getInterestLifetime() const
const Interest & getInterest() const
Definition: nack.hpp:53
STL namespace.
EventId scheduleEvent(time::nanoseconds after, const EventCallback &callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:88
represents a Network Nack
Definition: nack.hpp:40
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
Definition: random.cpp:63
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
fires when a NACK is received
signal::Signal< NotificationSubscriberBase > onTimeout
fires when no Notification is received within .getInterestLifetime period
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:90
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Definition: name.hpp:418
Represents an absolute name.
Definition: name.hpp:42
const Name & getName() const
Get name.
Definition: data.hpp:124
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
fires when a Data packet in the Notification Stream cannot be decoded as Notification ...
uint64_t toSequenceNumber() const
Interpret as sequence number component using NDN naming conventions.
const PendingInterestId * expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express Interest.
Definition: face.cpp:178
Represents a Data packet.
Definition: data.hpp:35
const Component & get(ssize_t i) const
Get the component at the given index.
Definition: name.hpp:156
void removePendingInterest(const PendingInterestId *pendingInterestId)
Cancel previously expressed Interest.
Definition: face.cpp:194
represents an error in TLV encoding or decoding
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
construct a NotificationSubscriber
void stop()
stop receiving notifications