full-producer.cpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
2 /*
3  * Copyright (c) 2014-2022, The University of Memphis
4  *
5  * This file is part of PSync.
6  * See AUTHORS.md for complete list of PSync authors and contributors.
7  *
8  * PSync is free software: you can redistribute it and/or modify it under the terms
9  * of the GNU Lesser General Public License as published by the Free Software Foundation,
10  * either version 3 of the License, or (at your option) any later version.
11  *
12  * PSync is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
13  * without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
14  * PURPOSE. See the GNU Lesser General Public License for more details.
15  *
16  * You should have received a copy of the GNU Lesser General Public License along with
17  * PSync, e.g., in COPYING.md file. If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "PSync/full-producer.hpp"
21 #include "PSync/detail/state.hpp"
22 #include "PSync/detail/util.hpp"
23 
24 #include <ndn-cxx/lp/tags.hpp>
25 #include <ndn-cxx/security/validator-null.hpp>
26 #include <ndn-cxx/util/logger.hpp>
27 
28 #include <cstring>
29 
30 namespace psync {
31 
32 NDN_LOG_INIT(psync.FullProducer);
33 
35  ndn::KeyChain& keyChain,
36  size_t expectedNumEntries,
37  const ndn::Name& syncPrefix,
38  const ndn::Name& userPrefix,
39  UpdateCallback onUpdateCb,
40  ndn::time::milliseconds syncInterestLifetime,
41  ndn::time::milliseconds syncReplyFreshness,
42  CompressionScheme ibltCompression,
43  CompressionScheme contentCompression)
44  : ProducerBase(face, keyChain, expectedNumEntries, syncPrefix, userPrefix,
45  syncReplyFreshness, ibltCompression, contentCompression)
46  , m_syncInterestLifetime(syncInterestLifetime)
47  , m_onUpdate(std::move(onUpdateCb))
48 {
49  m_registeredPrefix = m_face.setInterestFilter(ndn::InterestFilter(m_syncPrefix).allowLoopback(false),
50  [this] (auto&&... args) { onSyncInterest(std::forward<decltype(args)>(args)...); },
51  [] (auto&&... args) { onRegisterFailed(std::forward<decltype(args)>(args)...); });
52 
53  // Should we do this after setInterestFilter success call back
54  // (Currently following ChronoSync's way)
55  sendSyncInterest();
56 }
57 
59 {
60  if (m_fetcher) {
61  m_fetcher->stop();
62  }
63 }
64 
65 void
66 FullProducer::publishName(const ndn::Name& prefix, std::optional<uint64_t> seq)
67 {
68  if (m_prefixes.find(prefix) == m_prefixes.end()) {
69  NDN_LOG_WARN("Prefix not added: " << prefix);
70  return;
71  }
72 
73  uint64_t newSeq = seq.value_or(m_prefixes[prefix] + 1);
74  NDN_LOG_INFO("Publish: " << prefix << "/" << newSeq);
75  updateSeqNo(prefix, newSeq);
76  satisfyPendingInterests();
77 }
78 
79 void
80 FullProducer::sendSyncInterest()
81 {
82  // If we send two sync interest one after the other
83  // since there is no new data in the network yet,
84  // when data is available it may satisfy both of them
85  if (m_fetcher) {
86  m_fetcher->stop();
87  }
88 
89  // Sync Interest format for full sync: /<sync-prefix>/<ourLatestIBF>
90  ndn::Name syncInterestName = m_syncPrefix;
91 
92  // Append our latest IBF
93  m_iblt.appendToName(syncInterestName);
94 
95  m_outstandingInterestName = syncInterestName;
96 
97  m_scheduledSyncInterestId =
98  m_scheduler.schedule(m_syncInterestLifetime / 2 + ndn::time::milliseconds(m_jitter(m_rng)),
99  [this] { sendSyncInterest(); });
100 
101  ndn::Interest syncInterest(syncInterestName);
102 
103  using ndn::util::SegmentFetcher;
104  SegmentFetcher::Options options;
105  options.interestLifetime = m_syncInterestLifetime;
106  options.maxTimeout = m_syncInterestLifetime;
107  options.rttOptions.initialRto = m_syncInterestLifetime;
108 
109  m_fetcher = SegmentFetcher::start(m_face, syncInterest,
110  ndn::security::getAcceptAllValidator(), options);
111 
112  m_fetcher->onComplete.connect([this, syncInterest] (const ndn::ConstBufferPtr& bufferPtr) {
113  onSyncData(syncInterest, bufferPtr);
114  });
115 
116  m_fetcher->afterSegmentValidated.connect([this] (const ndn::Data& data) {
117  auto tag = data.getTag<ndn::lp::IncomingFaceIdTag>();
118  if (tag) {
119  m_incomingFace = *tag;
120  }
121  else {
122  m_incomingFace = 0;
123  }
124  });
125 
126  m_fetcher->onError.connect([this] (uint32_t errorCode, const std::string& msg) {
127  NDN_LOG_ERROR("Cannot fetch sync data, error: " << errorCode << ", message: " << msg);
128  // We would like to recover from errors like NoRoute NACK quicker than sync Interest timeout.
129  // We don't react to Interest timeout here as we have scheduled the next sync Interest
130  // to be sent in half the sync Interest lifetime + jitter above. So we would react to
131  // timeout before it happens.
132  if (errorCode != SegmentFetcher::ErrorCode::INTEREST_TIMEOUT) {
133  auto after = ndn::time::milliseconds(m_jitter(m_rng));
134  NDN_LOG_DEBUG("Schedule sync interest after: " << after);
135  m_scheduledSyncInterestId = m_scheduler.schedule(after, [this] { sendSyncInterest(); });
136  }
137  });
138 
139  NDN_LOG_DEBUG("sendFullSyncInterest, nonce: " << syncInterest.getNonce() <<
140  ", hash: " << std::hash<ndn::Name>{}(syncInterestName));
141 }
142 
143 void
144 FullProducer::onSyncInterest(const ndn::Name& prefixName, const ndn::Interest& interest)
145 {
146  // TODO: answer only segments from store.
147  if (m_segmentPublisher.replyFromStore(interest.getName())) {
148  return;
149  }
150 
151  ndn::Name nameWithoutSyncPrefix = interest.getName().getSubName(prefixName.size());
152  ndn::Name interestName;
153 
154  if (nameWithoutSyncPrefix.size() == 1) {
155  // Get /<prefix>/IBF from /<prefix>/IBF
156  interestName = interest.getName();
157  }
158  else if (nameWithoutSyncPrefix.size() == 3) {
159  // Get /<prefix>/IBF from /<prefix>/IBF/<version>/<segment-no>
160  interestName = interest.getName().getPrefix(-2);
161  }
162  else {
163  return;
164  }
165 
166  ndn::name::Component ibltName = interestName.get(interestName.size() - 1);
167 
168  NDN_LOG_DEBUG("Full sync Interest received, nonce: " << interest.getNonce() <<
169  ", hash: " << std::hash<ndn::Name>{}(interestName));
170 
171  detail::IBLT iblt(m_expectedNumEntries, m_ibltCompression);
172  try {
173  iblt.initialize(ibltName);
174  }
175  catch (const std::exception& e) {
176  NDN_LOG_WARN(e.what());
177  return;
178  }
179 
180  auto diff = m_iblt - iblt;
181 
182  std::set<uint32_t> positive;
183  std::set<uint32_t> negative;
184 
185  if (!diff.listEntries(positive, negative)) {
186  NDN_LOG_TRACE("Cannot decode differences, positive: " << positive.size()
187  << " negative: " << negative.size() << " m_threshold: "
188  << m_threshold);
189 
190  // Send all data if greater then threshold, else send positive below as usual
191  // Or send if we can't get neither positive nor negative differences
192  if (positive.size() + negative.size() >= m_threshold ||
193  (positive.empty() && negative.empty())) {
194  detail::State state;
195  for (const auto& content : m_prefixes) {
196  if (content.second != 0) {
197  state.addContent(ndn::Name(content.first).appendNumber(content.second));
198  }
199  }
200 
201  if (!state.getContent().empty()) {
202  sendSyncData(interest.getName(), state.wireEncode());
203  }
204 
205 #ifdef PSYNC_WITH_TESTS
206  ++nIbfDecodeFailuresAboveThreshold;
207 #endif // PSYNC_WITH_TESTS
208  return;
209  }
210 
211 #ifdef PSYNC_WITH_TESTS
212  ++nIbfDecodeFailuresBelowThreshold;
213 #endif // PSYNC_WITH_TESTS
214  }
215 
216  detail::State state;
217  for (const auto& hash : positive) {
218  auto nameIt = m_biMap.left.find(hash);
219  if (nameIt != m_biMap.left.end()) {
220  ndn::Name nameWithoutSeq = nameIt->second.getPrefix(-1);
221  // Don't sync up sequence number zero
222  if (m_prefixes[nameWithoutSeq] != 0 &&
223  !isFutureHash(nameWithoutSeq, negative)) {
224  state.addContent(nameIt->second);
225  }
226  }
227  }
228 
229  if (!state.getContent().empty()) {
230  NDN_LOG_DEBUG("Sending sync content: " << state);
231  sendSyncData(interestName, state.wireEncode());
232  return;
233  }
234 
235  auto& entry = m_pendingEntries.emplace(interestName, PendingEntryInfo{iblt, {}}).first->second;
236  entry.expirationEvent = m_scheduler.schedule(interest.getInterestLifetime(),
237  [this, interest] {
238  NDN_LOG_TRACE("Erase pending Interest " << interest.getNonce());
239  m_pendingEntries.erase(interest.getName());
240  });
241 }
242 
243 void
244 FullProducer::sendSyncData(const ndn::Name& name, const ndn::Block& block)
245 {
246  NDN_LOG_DEBUG("Checking if data will satisfy our own pending interest");
247 
248  ndn::Name nameWithIblt;
249  m_iblt.appendToName(nameWithIblt);
250 
251  // TODO: Remove appending of hash - serves no purpose to the receiver
252  ndn::Name dataName(ndn::Name(name).appendNumber(std::hash<ndn::Name>{}(nameWithIblt)));
253 
254  auto content = detail::compress(m_contentCompression, block);
255 
256  // checking if our own interest got satisfied
257  if (m_outstandingInterestName == name) {
258  NDN_LOG_DEBUG("Satisfied our own pending interest");
259  // remove outstanding interest
260  if (m_fetcher) {
261  NDN_LOG_DEBUG("Removing our pending interest from face (stop fetcher)");
262  m_fetcher->stop();
263  m_outstandingInterestName = ndn::Name("");
264  }
265 
266  NDN_LOG_DEBUG("Sending sync Data");
267 
268  // Send data after removing pending sync interest on face
269  m_segmentPublisher.publish(name, dataName, *content, m_syncReplyFreshness);
270 
271  NDN_LOG_TRACE("Renewing sync interest");
272  sendSyncInterest();
273  }
274  else {
275  NDN_LOG_DEBUG("Sending sync Data");
276  m_segmentPublisher.publish(name, dataName, *content, m_syncReplyFreshness);
277  }
278 }
279 
280 void
281 FullProducer::onSyncData(const ndn::Interest& interest, const ndn::ConstBufferPtr& bufferPtr)
282 {
283  deletePendingInterests(interest.getName());
284 
285  detail::State state;
286  try {
287  auto decompressed = detail::decompress(m_contentCompression, *bufferPtr);
288  state.wireDecode(ndn::Block(std::move(decompressed)));
289  }
290  catch (const std::exception& e) {
291  NDN_LOG_ERROR("Cannot parse received sync Data: " << e.what());
292  return;
293  }
294  NDN_LOG_DEBUG("Sync Data received: " << state);
295 
296  std::vector<MissingDataInfo> updates;
297 
298  for (const auto& content : state) {
299  ndn::Name prefix = content.getPrefix(-1);
300  uint64_t seq = content.get(content.size() - 1).toNumber();
301 
302  if (m_prefixes.find(prefix) == m_prefixes.end() || m_prefixes[prefix] < seq) {
303  updates.push_back({prefix, m_prefixes[prefix] + 1, seq, m_incomingFace});
304  updateSeqNo(prefix, seq);
305  // We should not call satisfyPendingSyncInterests here because we just
306  // got data and deleted pending interest by calling deletePendingFullSyncInterests
307  // But we might have interests not matching to this interest that might not have deleted
308  // from pending sync interest
309  }
310  }
311 
312  // We just got the data, so send a new sync interest
313  if (!updates.empty()) {
314  m_onUpdate(updates);
315  NDN_LOG_TRACE("Renewing sync interest");
316  sendSyncInterest();
317  }
318  else {
319  NDN_LOG_TRACE("No new update, interest nonce: " << interest.getNonce() <<
320  " , hash: " << std::hash<ndn::Name>{}(interest.getName()));
321  }
322 }
323 
324 void
325 FullProducer::satisfyPendingInterests()
326 {
327  NDN_LOG_DEBUG("Satisfying full sync interest: " << m_pendingEntries.size());
328 
329  for (auto it = m_pendingEntries.begin(); it != m_pendingEntries.end();) {
330  const auto& entry = it->second;
331  auto diff = m_iblt - entry.iblt;
332  std::set<uint32_t> positive;
333  std::set<uint32_t> negative;
334 
335  if (!diff.listEntries(positive, negative)) {
336  NDN_LOG_TRACE("Decode failed for pending interest");
337  if (positive.size() + negative.size() >= m_threshold ||
338  (positive.empty() && negative.empty())) {
339  NDN_LOG_TRACE("pos + neg > threshold or no diff can be found, erase pending interest");
340  it = m_pendingEntries.erase(it);
341  continue;
342  }
343  }
344 
345  detail::State state;
346  for (const auto& hash : positive) {
347  auto nameIt = m_biMap.left.find(hash);
348  if (nameIt != m_biMap.left.end()) {
349  if (m_prefixes[nameIt->second.getPrefix(-1)] != 0) {
350  state.addContent(nameIt->second);
351  }
352  }
353  }
354 
355  if (!state.getContent().empty()) {
356  NDN_LOG_DEBUG("Satisfying sync content: " << state);
357  sendSyncData(it->first, state.wireEncode());
358  it = m_pendingEntries.erase(it);
359  }
360  else {
361  ++it;
362  }
363  }
364 }
365 
366 bool
367 FullProducer::isFutureHash(const ndn::Name& prefix, const std::set<uint32_t>& negative)
368 {
370  ndn::Name(prefix).appendNumber(m_prefixes[prefix] + 1));
371  return negative.find(nextHash) != negative.end();
372 }
373 
374 void
375 FullProducer::deletePendingInterests(const ndn::Name& interestName)
376 {
377  auto it = m_pendingEntries.find(interestName);
378  if (it != m_pendingEntries.end()) {
379  NDN_LOG_TRACE("Delete pending interest: " << interestName);
380  it = m_pendingEntries.erase(it);
381  }
382 }
383 
384 } // namespace psync
Full sync logic to synchronize with other nodes where all nodes wants to get all names prefixes synce...
FullProducer(ndn::Face &face, ndn::KeyChain &keyChain, size_t expectedNumEntries, const ndn::Name &syncPrefix, const ndn::Name &userPrefix, UpdateCallback onUpdateCallBack, ndn::time::milliseconds syncInterestLifetime=SYNC_INTEREST_LIFETIME, ndn::time::milliseconds syncReplyFreshness=SYNC_REPLY_FRESHNESS, CompressionScheme ibltCompression=CompressionScheme::DEFAULT, CompressionScheme contentCompression=CompressionScheme::DEFAULT)
Constructor.
void publishName(const ndn::Name &prefix, std::optional< uint64_t > seq=std::nullopt)
Publish name to let others know.
Base class for PartialProducer and FullProducer.
const CompressionScheme m_contentCompression
const ndn::Name m_syncPrefix
const ndn::time::milliseconds m_syncReplyFreshness
const size_t m_expectedNumEntries
std::map< ndn::Name, uint64_t > m_prefixes
const size_t m_threshold
HashNameBiMap m_biMap
ndn::Scheduler m_scheduler
static void onRegisterFailed(const ndn::Name &prefix, const std::string &msg)
Logs a message and throws if setting an interest filter fails.
const CompressionScheme m_ibltCompression
void updateSeqNo(const ndn::Name &prefix, uint64_t seq)
Update m_prefixes and IBF with the given prefix and seq.
SegmentPublisher m_segmentPublisher
ndn::random::RandomNumberEngine & m_rng
bool replyFromStore(const ndn::Name &interestName)
Try to reply from memory, return false if we cannot find the segment.
void publish(const ndn::Name &interestName, const ndn::Name &dataName, ndn::span< const uint8_t > buffer, ndn::time::milliseconds freshness)
Put all the segments in memory.
void appendToName(ndn::Name &name) const
Appends self to name.
Definition: iblt.cpp:183
uint32_t murmurHash3(const void *key, size_t len, uint32_t seed)
Definition: util.cpp:58
constexpr size_t N_HASHCHECK
Definition: iblt.hpp:73
std::shared_ptr< ndn::Buffer > compress(CompressionScheme scheme, ndn::span< const uint8_t > buffer)
Definition: util.cpp:124
std::shared_ptr< ndn::Buffer > decompress(CompressionScheme scheme, ndn::span< const uint8_t > buffer)
Definition: util.cpp:183
Definition: common.hpp:34
CompressionScheme
Definition: common.hpp:43
std::function< void(const std::vector< MissingDataInfo > &)> UpdateCallback
Definition: common.hpp:71