consumer.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #ifndef PSYNC_CONSUMER_HPP
21 #define PSYNC_CONSUMER_HPP
22 
23 #include "PSync/common.hpp"
26 
27 #include <ndn-cxx/face.hpp>
28 #include <ndn-cxx/util/random.hpp>
29 #include <ndn-cxx/util/scheduler.hpp>
30 #include <ndn-cxx/util/segment-fetcher.hpp>
31 
32 #include <map>
33 
34 namespace psync {
35 
36 using ReceiveHelloCallback = std::function<void(const std::map<ndn::Name, uint64_t>&)>;
37 
55 class Consumer
56 {
57 public:
70  Consumer(const ndn::Name& syncPrefix,
71  ndn::Face& face,
72  const ReceiveHelloCallback& onReceiveHelloData,
73  const UpdateCallback& onUpdate,
74  unsigned int count,
75  double false_positive,
76  ndn::time::milliseconds helloInterestLifetime = HELLO_INTEREST_LIFETIME,
77  ndn::time::milliseconds syncInterestLifetime = SYNC_INTEREST_LIFETIME);
78 
84  void
86 
92  void
94 
107  bool
108  addSubscription(const ndn::Name& prefix, uint64_t seqNo, bool callSyncDataCb = true);
109 
116  bool
117  removeSubscription(const ndn::Name& prefix);
118 
119  std::set<ndn::Name>
121  {
122  return m_subscriptionList;
123  }
124 
125  bool
126  isSubscribed(const ndn::Name& prefix) const
127  {
128  return m_subscriptionList.find(prefix) != m_subscriptionList.end();
129  }
130 
131  std::optional<uint64_t>
132  getSeqNo(const ndn::Name& prefix) const
133  {
134  auto it = m_prefixes.find(prefix);
135  if (it == m_prefixes.end()) {
136  return std::nullopt;
137  }
138  return it->second;
139  }
140 
144  void
145  stop();
146 
147 private:
160  void
161  onHelloData(const ndn::ConstBufferPtr& bufferPtr);
162 
173  void
174  onSyncData(const ndn::ConstBufferPtr& bufferPtr);
175 
177  ndn::Face& m_face;
178  ndn::Scheduler m_scheduler;
179 
180  ndn::Name m_syncPrefix;
181  ndn::Name m_helloInterestPrefix;
182  ndn::Name m_syncInterestPrefix;
183  ndn::Name m_iblt;
184  ndn::Name m_helloDataName;
185  ndn::Name m_syncDataName;
186  uint32_t m_syncDataContentType;
187 
188  ReceiveHelloCallback m_onReceiveHelloData;
189 
190  // Called when new sync update is received from producer.
191  UpdateCallback m_onUpdate;
192 
193  // Bloom filter is used to store application/user's subscription list.
194  detail::BloomFilter m_bloomFilter;
195 
196  ndn::time::milliseconds m_helloInterestLifetime;
197  ndn::time::milliseconds m_syncInterestLifetime;
198 
199  // Store sequence number for the prefix.
200  std::map<ndn::Name, uint64_t> m_prefixes;
201  std::set<ndn::Name> m_subscriptionList;
202 
203  ndn::random::RandomNumberEngine& m_rng;
204  std::uniform_int_distribution<> m_rangeUniformRandom;
205  std::shared_ptr<ndn::util::SegmentFetcher> m_helloFetcher;
206  std::shared_ptr<ndn::util::SegmentFetcher> m_syncFetcher;
207 };
208 
209 } // namespace psync
210 
211 #endif // PSYNC_CONSUMER_HPP
#define PSYNC_PUBLIC_WITH_TESTS_ELSE_PRIVATE
Consumer logic to subscribe to producer's data.
Definition: consumer.hpp:56
bool addSubscription(const ndn::Name &prefix, uint64_t seqNo, bool callSyncDataCb=true)
Add prefix to subscription list.
Definition: consumer.cpp:55
bool removeSubscription(const ndn::Name &prefix)
Remove prefix from subscription list.
Definition: consumer.cpp:75
void stop()
Stop segment fetcher to stop the sync and free resources.
Definition: consumer.cpp:95
Consumer(const ndn::Name &syncPrefix, ndn::Face &face, const ReceiveHelloCallback &onReceiveHelloData, const UpdateCallback &onUpdate, unsigned int count, double false_positive, ndn::time::milliseconds helloInterestLifetime=HELLO_INTEREST_LIFETIME, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFETIME)
constructor
Definition: consumer.cpp:30
std::optional< uint64_t > getSeqNo(const ndn::Name &prefix) const
Definition: consumer.hpp:132
void sendHelloInterest()
send hello interest /<sync-prefix>/hello/
Definition: consumer.cpp:112
std::set< ndn::Name > getSubscriptionList() const
Definition: consumer.hpp:120
void sendSyncInterest()
send sync interest /<sync-prefix>/sync/<BF>/<producers-IBF>
Definition: consumer.cpp:186
bool isSubscribed(const ndn::Name &prefix) const
Definition: consumer.hpp:126
Definition: common.hpp:34
constexpr ndn::time::milliseconds SYNC_INTEREST_LIFETIME
Definition: common.hpp:40
constexpr ndn::time::milliseconds HELLO_INTEREST_LIFETIME
Definition: common.hpp:38
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
Definition: common.hpp:71
std::function< void(const std::map< ndn::Name, uint64_t > &)> ReceiveHelloCallback
Definition: consumer.hpp:36