notification-subscriber.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022 Regents of the University of California,
4  * Arizona Board of Regents,
5  * Colorado State University,
6  * University Pierre & Marie Curie, Sorbonne University,
7  * Washington University in St. Louis,
8  * Beijing Institute of Technology,
9  * The University of Memphis.
10  *
11  * This file is part of ndn-cxx library (NDN C++ library with eXperimental eXtensions).
12  *
13  * ndn-cxx library is free software: you can redistribute it and/or modify it under the
14  * terms of the GNU Lesser General Public License as published by the Free Software
15  * Foundation, either version 3 of the License, or (at your option) any later version.
16  *
17  * ndn-cxx library is distributed in the hope that it will be useful, but WITHOUT ANY
18  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
19  * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
20  *
21  * You should have received copies of the GNU General Public License and GNU Lesser
22  * General Public License along with ndn-cxx, e.g., in COPYING.md file. If not, see
23  * <http://www.gnu.org/licenses/>.
24  *
25  * See AUTHORS.md for complete list of ndn-cxx authors and contributors.
26  */
27 
29 #include "ndn-cxx/util/random.hpp"
30 
31 #include <cmath>
32 
33 namespace ndn {
34 namespace util {
35 
37  time::milliseconds interestLifetime)
38  : m_face(face)
39  , m_prefix(prefix)
40  , m_isRunning(false)
41  , m_lastSequenceNum(std::numeric_limits<uint64_t>::max())
42  , m_lastNackSequenceNum(std::numeric_limits<uint64_t>::max())
43  , m_attempts(1)
44  , m_scheduler(face.getIoService())
45  , m_interestLifetime(interestLifetime)
46 {
47 }
48 
50 
51 void
53 {
54  if (m_isRunning) // already running
55  return;
56  m_isRunning = true;
57 
58  sendInitialInterest();
59 }
60 
61 void
63 {
64  if (!m_isRunning) // not running
65  return;
66  m_isRunning = false;
67 
68  m_lastInterest.cancel();
69 }
70 
71 void
72 NotificationSubscriberBase::sendInitialInterest()
73 {
74  if (shouldStop())
75  return;
76 
77  auto interest = make_shared<Interest>(m_prefix);
78  interest->setCanBePrefix(true);
79  interest->setMustBeFresh(true);
80  interest->setInterestLifetime(m_interestLifetime);
81  sendInterest(*interest);
82 }
83 
84 void
85 NotificationSubscriberBase::sendNextInterest()
86 {
87  if (shouldStop())
88  return;
89 
90  Name nextName = m_prefix;
91  nextName.appendSequenceNumber(m_lastSequenceNum + 1);
92 
93  auto interest = make_shared<Interest>(nextName);
94  interest->setInterestLifetime(m_interestLifetime);
95  sendInterest(*interest);
96 }
97 
98 void
99 NotificationSubscriberBase::sendInterest(const Interest& interest)
100 {
101  m_lastInterest = m_face.expressInterest(interest,
102  [this] (const auto&, const auto& d) { this->afterReceiveData(d); },
103  [this] (const auto&, const auto& n) { this->afterReceiveNack(n); },
104  [this] (const auto&) { this->afterTimeout(); });
105 }
106 
107 bool
108 NotificationSubscriberBase::shouldStop()
109 {
110  if (!m_isRunning)
111  return true;
112 
113  if (!hasSubscriber() && onNack.isEmpty()) {
114  stop();
115  return true;
116  }
117  return false;
118 }
119 
120 void
121 NotificationSubscriberBase::afterReceiveData(const Data& data)
122 {
123  if (shouldStop())
124  return;
125 
126  try {
127  m_lastSequenceNum = data.getName().get(-1).toSequenceNumber();
128  }
129  catch (const tlv::Error&) {
130  onDecodeError(data);
131  sendInitialInterest();
132  return;
133  }
134 
135  if (!decodeAndDeliver(data)) {
136  onDecodeError(data);
137  sendInitialInterest();
138  return;
139  }
140 
141  sendNextInterest();
142 }
143 
144 void
145 NotificationSubscriberBase::afterReceiveNack(const lp::Nack& nack)
146 {
147  if (shouldStop())
148  return;
149 
150  onNack(nack);
151 
152  time::milliseconds delay = exponentialBackoff(nack);
153  m_nackEvent = m_scheduler.schedule(delay, [this] { sendInitialInterest(); });
154 }
155 
156 void
157 NotificationSubscriberBase::afterTimeout()
158 {
159  if (shouldStop())
160  return;
161 
162  onTimeout();
163 
164  sendInitialInterest();
165 }
166 
168 NotificationSubscriberBase::exponentialBackoff(lp::Nack nack)
169 {
170  uint64_t nackSequenceNum;
171  try {
172  nackSequenceNum = nack.getInterest().getName().get(-1).toSequenceNumber();
173  }
174  catch (const tlv::Error&) {
175  nackSequenceNum = 0;
176  }
177 
178  if (m_lastNackSequenceNum == nackSequenceNum) {
179  ++m_attempts;
180  }
181  else {
182  m_attempts = 1;
183  }
184 
185  m_lastNackSequenceNum = nackSequenceNum;
186 
187  return time::milliseconds(static_cast<time::milliseconds::rep>(std::pow(2, m_attempts) * 100 +
188  random::generateWord32() % 100));
189 }
190 
191 } // namespace util
192 } // namespace ndn
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:91
PendingInterestHandle expressInterest(const Interest &interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Express an Interest.
Definition: face.cpp:164
Represents an absolute name.
Definition: name.hpp:44
Name & appendSequenceNumber(uint64_t seqNo)
Append a sequence number component.
Definition: name.hpp:469
void cancel()
Cancel the operation.
EventId schedule(time::nanoseconds after, EventCallback callback)
Schedule a one-time event after the specified delay.
Definition: scheduler.cpp:92
void start()
Start or resume receiving notifications.
signal::Signal< NotificationSubscriberBase, Data > onDecodeError
Fires when a Data packet in the Notification Stream cannot be decoded as Notification.
void stop()
Stop receiving notifications.
signal::Signal< NotificationSubscriberBase > onTimeout
Fires when no Notification is received within getInterestLifetime() period.
NotificationSubscriberBase(Face &face, const Name &prefix, time::milliseconds interestLifetime)
Construct a NotificationSubscriber.
signal::Signal< NotificationSubscriberBase, lp::Nack > onNack
Fires when a Nack is received.
uint32_t generateWord32()
Generate a non-cryptographically-secure random integer in the range [0, 2^32)
Definition: random.cpp:68
boost::chrono::milliseconds milliseconds
Definition: time.hpp:48
@ Data
Definition: tlv.hpp:69
@ Interest
Definition: tlv.hpp:68
Definition: data.cpp:25