face-impl.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
22 #ifndef NDN_DETAIL_FACE_IMPL_HPP
23 #define NDN_DETAIL_FACE_IMPL_HPP
24 
25 #include "../common.hpp"
26 #include "../face.hpp"
27 
28 #include "registered-prefix.hpp"
29 #include "pending-interest.hpp"
31 
32 #include "../util/scheduler.hpp"
33 #include "../util/config-file.hpp"
34 #include "../util/signal.hpp"
35 
36 #include "../transport/transport.hpp"
37 #include "../transport/unix-transport.hpp"
38 #include "../transport/tcp-transport.hpp"
39 
40 #include "../mgmt/nfd/controller.hpp"
41 #include "../mgmt/nfd/command-options.hpp"
42 
43 #include "../lp/packet.hpp"
44 #include "../lp/tags.hpp"
45 
46 namespace ndn {
47 
51 class Face::Impl : noncopyable
52 {
53 public:
55  typedef std::list<shared_ptr<InterestFilterRecord>> InterestFilterTable;
57 
58  explicit
59  Impl(Face& face)
60  : m_face(face)
61  , m_scheduler(m_face.getIoService())
62  , m_processEventsTimeoutEvent(m_scheduler)
63  {
64  auto postOnEmptyPitOrNoRegisteredPrefixes = [this] {
65  this->m_face.getIoService().post([this] { this->onEmptyPitOrNoRegisteredPrefixes(); });
66  // without this extra "post", transport can get paused (-async_read) and then resumed
67  // (+async_read) from within onInterest/onData callback. After onInterest/onData
68  // finishes, there is another +async_read with the same memory block. A few of such
69  // async_read duplications can cause various effects and result in segfault.
70  };
71 
72  m_pendingInterestTable.onEmpty.connect(postOnEmptyPitOrNoRegisteredPrefixes);
73  m_registeredPrefixTable.onEmpty.connect(postOnEmptyPitOrNoRegisteredPrefixes);
74  }
75 
76 public: // consumer
77  void
78  asyncExpressInterest(shared_ptr<const Interest> interest,
79  const DataCallback& afterSatisfied,
80  const NackCallback& afterNacked,
81  const TimeoutCallback& afterTimeout)
82  {
83  this->ensureConnected(true);
84 
85  auto entry = m_pendingInterestTable.insert(make_shared<PendingInterest>(
86  interest, afterSatisfied, afterNacked, afterTimeout, ref(m_scheduler))).first;
87  (*entry)->setDeleter([this, entry] { m_pendingInterestTable.erase(entry); });
88 
89  lp::Packet packet;
90 
91  shared_ptr<lp::NextHopFaceIdTag> nextHopFaceIdTag = interest->getTag<lp::NextHopFaceIdTag>();
92  if (nextHopFaceIdTag != nullptr) {
93  packet.add<lp::NextHopFaceIdField>(*nextHopFaceIdTag);
94  }
95 
96  packet.add<lp::FragmentField>(std::make_pair(interest->wireEncode().begin(),
97  interest->wireEncode().end()));
98 
99  m_face.m_transport->send(packet.wireEncode());
100  }
101 
102  void
103  asyncRemovePendingInterest(const PendingInterestId* pendingInterestId)
104  {
105  m_pendingInterestTable.remove_if(MatchPendingInterestId(pendingInterestId));
106  }
107 
108  void
110  {
111  m_pendingInterestTable.clear();
112  }
113 
114  void
116  {
117  for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
118  if ((*entry)->getInterest()->matchesData(data)) {
119  shared_ptr<PendingInterest> matchedEntry = *entry;
120  entry = m_pendingInterestTable.erase(entry);
121  matchedEntry->invokeDataCallback(data);
122  }
123  else {
124  ++entry;
125  }
126  }
127  }
128 
129  void
131  {
132  for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
133  const Interest& pendingInterest = *(*entry)->getInterest();
134  if (pendingInterest == nack.getInterest()) {
135  shared_ptr<PendingInterest> matchedEntry = *entry;
136  entry = m_pendingInterestTable.erase(entry);
137  matchedEntry->invokeNackCallback(nack);
138  }
139  else {
140  ++entry;
141  }
142  }
143  }
144 
145 public: // producer
146  void
147  asyncSetInterestFilter(shared_ptr<InterestFilterRecord> interestFilterRecord)
148  {
149  m_interestFilterTable.push_back(interestFilterRecord);
150  }
151 
152  void
153  asyncUnsetInterestFilter(const InterestFilterId* interestFilterId)
154  {
155  InterestFilterTable::iterator i = std::find_if(m_interestFilterTable.begin(),
156  m_interestFilterTable.end(),
157  MatchInterestFilterId(interestFilterId));
158  if (i != m_interestFilterTable.end()) {
159  m_interestFilterTable.erase(i);
160  }
161  }
162 
163  void
165  {
166  for (const auto& filter : m_interestFilterTable) {
167  if (filter->doesMatch(interest.getName())) {
168  filter->invokeInterestCallback(interest);
169  }
170  }
171  }
172 
173  void
174  asyncSend(const Block& wire)
175  {
176  this->ensureConnected(true);
177  m_face.m_transport->send(wire);
178  }
179 
180 public: // prefix registration
181  const RegisteredPrefixId*
182  registerPrefix(const Name& prefix,
183  shared_ptr<InterestFilterRecord> filter,
184  const RegisterPrefixSuccessCallback& onSuccess,
185  const RegisterPrefixFailureCallback& onFailure,
186  uint64_t flags,
187  const nfd::CommandOptions& options)
188  {
189  nfd::ControlParameters params;
190  params.setName(prefix);
191  params.setFlags(flags);
192 
193  auto prefixToRegister = make_shared<RegisteredPrefix>(prefix, filter, options);
194 
195  m_face.m_nfdController->start<nfd::RibRegisterCommand>(
196  params,
197  [=] (const nfd::ControlParameters&) { this->afterPrefixRegistered(prefixToRegister, onSuccess); },
198  [=] (const nfd::ControlResponse& resp) { onFailure(prefixToRegister->getPrefix(), resp.getText()); },
199  options);
200 
201  return reinterpret_cast<const RegisteredPrefixId*>(prefixToRegister.get());
202  }
203 
204  void
205  afterPrefixRegistered(shared_ptr<RegisteredPrefix> registeredPrefix,
206  const RegisterPrefixSuccessCallback& onSuccess)
207  {
208  m_registeredPrefixTable.insert(registeredPrefix);
209 
210  if (registeredPrefix->getFilter() != nullptr) {
211  // it was a combined operation
212  m_interestFilterTable.push_back(registeredPrefix->getFilter());
213  }
214 
215  if (onSuccess != nullptr) {
216  onSuccess(registeredPrefix->getPrefix());
217  }
218  }
219 
220  void
221  asyncUnregisterPrefix(const RegisteredPrefixId* registeredPrefixId,
222  const UnregisterPrefixSuccessCallback& onSuccess,
223  const UnregisterPrefixFailureCallback& onFailure)
224  {
225  auto i = std::find_if(m_registeredPrefixTable.begin(),
226  m_registeredPrefixTable.end(),
227  MatchRegisteredPrefixId(registeredPrefixId));
228  if (i != m_registeredPrefixTable.end()) {
229  RegisteredPrefix& record = **i;
230  const shared_ptr<InterestFilterRecord>& filter = record.getFilter();
231 
232  if (filter != nullptr) {
233  // it was a combined operation
234  m_interestFilterTable.remove(filter);
235  }
236 
237  nfd::ControlParameters params;
238  params.setName(record.getPrefix());
239  m_face.m_nfdController->start<nfd::RibUnregisterCommand>(
240  params,
241  [=] (const nfd::ControlParameters&) { this->finalizeUnregisterPrefix(i, onSuccess); },
242  [=] (const nfd::ControlResponse& resp) { onFailure(resp.getText()); },
243  record.getCommandOptions());
244  }
245  else {
246  if (onFailure != nullptr) {
247  onFailure("Unrecognized PrefixId");
248  }
249  }
250 
251  // there cannot be two registered prefixes with the same id
252  }
253 
254  void
256  const UnregisterPrefixSuccessCallback& onSuccess)
257  {
258  m_registeredPrefixTable.erase(item);
259 
260  if (onSuccess != nullptr) {
261  onSuccess();
262  }
263  }
264 
265 public: // IO routine
266  void
267  ensureConnected(bool wantResume)
268  {
269  if (!m_face.m_transport->isConnected())
270  m_face.m_transport->connect(m_face.m_ioService,
271  [=] (const Block& wire) { m_face.onReceiveElement(wire); });
272 
273  if (wantResume && !m_face.m_transport->isReceiving()) {
274  m_face.m_transport->resume();
275  }
276  }
277 
278  void
280  {
281  if (m_pendingInterestTable.empty() && m_registeredPrefixTable.empty()) {
282  m_face.m_transport->pause();
283  if (!m_ioServiceWork) {
284  m_processEventsTimeoutEvent.cancel();
285  }
286  }
287  }
288 
289 private:
290  Face& m_face;
291  util::Scheduler m_scheduler;
292  util::scheduler::ScopedEventId m_processEventsTimeoutEvent;
293 
294  PendingInterestTable m_pendingInterestTable;
295  InterestFilterTable m_interestFilterTable;
296  RegisteredPrefixTable m_registeredPrefixTable;
297 
298  unique_ptr<boost::asio::io_service::work> m_ioServiceWork; // if thread needs to be preserved
299 
300  friend class Face;
301 };
302 
303 } // namespace ndn
304 
305 #endif // NDN_DETAIL_FACE_IMPL_HPP
size_t wireEncode(EncodingImpl< TAG > &encoder) const
append packet to encoder
Definition: packet.cpp:42
const Name & getName() const
Definition: interest.hpp:215
Copyright (c) 2013-2016 Regents of the University of California.
Definition: common.hpp:74
void satisfyPendingInterests(const Data &data)
Definition: face-impl.hpp:115
function< void(const std::string &)> UnregisterPrefixFailureCallback
Callback invoked when unregisterPrefix or unsetInterestFilter command fails.
Definition: face.hpp:120
void processInterestFilters(Interest &interest)
Definition: face-impl.hpp:164
represents parameters in a ControlCommand request or response
implementation detail of Face
Definition: face-impl.hpp:51
Packet & add(const typename FIELD::ValueType &value)
add a FIELD with value
Definition: packet.hpp:156
void asyncExpressInterest(shared_ptr< const Interest > interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Definition: face-impl.hpp:78
const Interest & getInterest() const
Definition: nack.hpp:53
ControlParameters & setFlags(uint64_t flags)
std::list< shared_ptr< InterestFilterRecord > > InterestFilterTable
Definition: face-impl.hpp:55
A container that emits onEmpty signal when it becomes empty.
const Name & getPrefix() const
Class representing a wire element of NDN-TLV packet format.
Definition: block.hpp:43
represents an Interest packet
Definition: interest.hpp:42
Functor to match PendingInterestId.
void asyncRemovePendingInterest(const PendingInterestId *pendingInterestId)
Definition: face-impl.hpp:103
const shared_ptr< InterestFilterRecord > & getFilter() const
stores information about a prefix registered in NDN forwarder
void asyncRemoveAllPendingInterests()
Definition: face-impl.hpp:109
Impl(Face &face)
Definition: face-impl.hpp:59
void asyncUnsetInterestFilter(const InterestFilterId *interestFilterId)
Definition: face-impl.hpp:153
represents a Network Nack
Definition: nack.hpp:40
provides a tag type for simple types
Definition: tag.hpp:58
void onEmptyPitOrNoRegisteredPrefixes()
Definition: face-impl.hpp:279
void finalizeUnregisterPrefix(RegisteredPrefixTable::iterator item, const UnregisterPrefixSuccessCallback &onSuccess)
Definition: face-impl.hpp:255
contains options for ControlCommand execution
std::pair< iterator, bool > insert(const value_type &value)
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:125
Functor to match InterestFilterId.
void ensureConnected(bool wantResume)
Definition: face-impl.hpp:267
function< void(const Name &, const std::string &)> RegisterPrefixFailureCallback
Callback invoked when registerPrefix or setInterestFilter command fails.
Definition: face.hpp:110
function< void(const Name &)> RegisterPrefixSuccessCallback
Callback invoked when registerPrefix or setInterestFilter command succeeds.
Definition: face.hpp:105
util::Signal< ContainerWithOnEmptySignal< T > > onEmpty
Signal to be fired when container becomes empty.
Name abstraction to represent an absolute name.
Definition: name.hpp:46
represents a rib/unregister command
boost::asio::io_service & getIoService()
Definition: face.hpp:699
represents a rib/register command
Event that is automatically cancelled upon destruction.
void cancel()
cancels the event manually
void nackPendingInterests(const lp::Nack &nack)
Definition: face-impl.hpp:130
void afterPrefixRegistered(shared_ptr< RegisteredPrefix > registeredPrefix, const RegisterPrefixSuccessCallback &onSuccess)
Definition: face-impl.hpp:205
void asyncSend(const Block &wire)
Definition: face-impl.hpp:174
ControlParameters & setName(const Name &name)
function< void()> UnregisterPrefixSuccessCallback
Callback invoked when unregisterPrefix or unsetInterestFilter command succeeds.
Definition: face.hpp:115
Functor to match RegisteredPrefixId.
ControlCommand response.
const nfd::CommandOptions & getCommandOptions() const
function< void(const Interest &)> TimeoutCallback
Callback invoked when expressed Interest times out.
Definition: face.hpp:77
ContainerWithOnEmptySignal< shared_ptr< RegisteredPrefix > > RegisteredPrefixTable
Definition: face-impl.hpp:56
function< void(const Interest &, const lp::Nack &)> NackCallback
Callback invoked when Nack is sent in response to expressed Interest.
Definition: face.hpp:72
represents a Data packet
Definition: data.hpp:37
void asyncUnregisterPrefix(const RegisteredPrefixId *registeredPrefixId, const UnregisterPrefixSuccessCallback &onSuccess, const UnregisterPrefixFailureCallback &onFailure)
Definition: face-impl.hpp:221
void asyncSetInterestFilter(shared_ptr< InterestFilterRecord > interestFilterRecord)
Definition: face-impl.hpp:147
ContainerWithOnEmptySignal< shared_ptr< PendingInterest > > PendingInterestTable
Definition: face-impl.hpp:54
function< void(const Interest &, const Data &)> DataCallback
Callback invoked when expressed Interest gets satisfied with a Data packet.
Definition: face.hpp:67
const RegisteredPrefixId * registerPrefix(const Name &prefix, shared_ptr< InterestFilterRecord > filter, const RegisterPrefixSuccessCallback &onSuccess, const RegisterPrefixFailureCallback &onFailure, uint64_t flags, const nfd::CommandOptions &options)
Definition: face-impl.hpp:182