face-impl.hpp
Go to the documentation of this file.
1 /* -*- Mode:C++; c-file-style:"gnu"; indent-tabs-mode:nil; -*- */
22 #ifndef NDN_DETAIL_FACE_IMPL_HPP
23 #define NDN_DETAIL_FACE_IMPL_HPP
24 
25 #include "../common.hpp"
26 #include "../face.hpp"
27 
28 #include "registered-prefix.hpp"
29 #include "pending-interest.hpp"
31 
32 #include "../util/scheduler.hpp"
33 #include "../util/config-file.hpp"
34 #include "../util/signal.hpp"
35 
36 #include "../transport/transport.hpp"
37 #include "../transport/unix-transport.hpp"
38 #include "../transport/tcp-transport.hpp"
39 
40 #include "../mgmt/nfd/controller.hpp"
41 #include "../mgmt/nfd/command-options.hpp"
42 
43 #include "../lp/packet.hpp"
44 #include "../lp/tags.hpp"
45 
46 namespace ndn {
47 
51 class Face::Impl : noncopyable
52 {
53 public:
55  typedef std::list<shared_ptr<InterestFilterRecord>> InterestFilterTable;
57 
58  explicit
59  Impl(Face& face)
60  : m_face(face)
61  , m_scheduler(m_face.getIoService())
62  , m_processEventsTimeoutEvent(m_scheduler)
63  {
64  auto postOnEmptyPitOrNoRegisteredPrefixes = [this] {
65  this->m_face.getIoService().post([this] { this->onEmptyPitOrNoRegisteredPrefixes(); });
66  // without this extra "post", transport can get paused (-async_read) and then resumed
67  // (+async_read) from within onInterest/onData callback. After onInterest/onData
68  // finishes, there is another +async_read with the same memory block. A few of such
69  // async_read duplications can cause various effects and result in segfault.
70  };
71 
72  m_pendingInterestTable.onEmpty.connect(postOnEmptyPitOrNoRegisteredPrefixes);
73  m_registeredPrefixTable.onEmpty.connect(postOnEmptyPitOrNoRegisteredPrefixes);
74  }
75 
76 public: // consumer
77  void
78  asyncExpressInterest(shared_ptr<const Interest> interest,
79  const DataCallback& afterSatisfied,
80  const NackCallback& afterNacked,
81  const TimeoutCallback& afterTimeout)
82  {
83  this->ensureConnected(true);
84 
85  auto entry = m_pendingInterestTable.insert(make_shared<PendingInterest>(
86  interest, afterSatisfied, afterNacked, afterTimeout, ref(m_scheduler))).first;
87  (*entry)->setDeleter([this, entry] { m_pendingInterestTable.erase(entry); });
88 
89  lp::Packet packet;
90 
91  shared_ptr<lp::NextHopFaceIdTag> nextHopFaceIdTag = interest->getTag<lp::NextHopFaceIdTag>();
92  if (nextHopFaceIdTag != nullptr) {
93  packet.add<lp::NextHopFaceIdField>(*nextHopFaceIdTag);
94  }
95 
96  shared_ptr<lp::CongestionMarkTag> congestionMarkTag = interest->getTag<lp::CongestionMarkTag>();
97  if (congestionMarkTag != nullptr) {
98  packet.add<lp::CongestionMarkField>(*congestionMarkTag);
99  }
100 
101  packet.add<lp::FragmentField>(std::make_pair(interest->wireEncode().begin(),
102  interest->wireEncode().end()));
103 
104  m_face.m_transport->send(packet.wireEncode());
105  }
106 
107  void
108  asyncRemovePendingInterest(const PendingInterestId* pendingInterestId)
109  {
110  m_pendingInterestTable.remove_if(MatchPendingInterestId(pendingInterestId));
111  }
112 
113  void
115  {
116  m_pendingInterestTable.clear();
117  }
118 
119  void
121  {
122  for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
123  if ((*entry)->getInterest()->matchesData(data)) {
124  shared_ptr<PendingInterest> matchedEntry = *entry;
125  entry = m_pendingInterestTable.erase(entry);
126  matchedEntry->invokeDataCallback(data);
127  }
128  else {
129  ++entry;
130  }
131  }
132  }
133 
134  void
136  {
137  for (auto entry = m_pendingInterestTable.begin(); entry != m_pendingInterestTable.end(); ) {
138  const Interest& pendingInterest = *(*entry)->getInterest();
139  if (nack.getInterest().matchesInterest(pendingInterest)) {
140  shared_ptr<PendingInterest> matchedEntry = *entry;
141  entry = m_pendingInterestTable.erase(entry);
142  matchedEntry->invokeNackCallback(nack);
143  }
144  else {
145  ++entry;
146  }
147  }
148  }
149 
150 public: // producer
151  void
152  asyncSetInterestFilter(shared_ptr<InterestFilterRecord> interestFilterRecord)
153  {
154  m_interestFilterTable.push_back(interestFilterRecord);
155  }
156 
157  void
158  asyncUnsetInterestFilter(const InterestFilterId* interestFilterId)
159  {
160  InterestFilterTable::iterator i = std::find_if(m_interestFilterTable.begin(),
161  m_interestFilterTable.end(),
162  MatchInterestFilterId(interestFilterId));
163  if (i != m_interestFilterTable.end()) {
164  m_interestFilterTable.erase(i);
165  }
166  }
167 
168  void
170  {
171  for (const auto& filter : m_interestFilterTable) {
172  if (filter->doesMatch(interest.getName())) {
173  filter->invokeInterestCallback(interest);
174  }
175  }
176  }
177 
178  void
179  asyncSend(const Block& wire)
180  {
181  this->ensureConnected(true);
182  m_face.m_transport->send(wire);
183  }
184 
185 public: // prefix registration
186  const RegisteredPrefixId*
187  registerPrefix(const Name& prefix,
188  shared_ptr<InterestFilterRecord> filter,
189  const RegisterPrefixSuccessCallback& onSuccess,
190  const RegisterPrefixFailureCallback& onFailure,
191  uint64_t flags,
192  const nfd::CommandOptions& options)
193  {
194  nfd::ControlParameters params;
195  params.setName(prefix);
196  params.setFlags(flags);
197 
198  auto prefixToRegister = make_shared<RegisteredPrefix>(prefix, filter, options);
199 
200  m_face.m_nfdController->start<nfd::RibRegisterCommand>(
201  params,
202  [=] (const nfd::ControlParameters&) { this->afterPrefixRegistered(prefixToRegister, onSuccess); },
203  [=] (const nfd::ControlResponse& resp) { onFailure(prefixToRegister->getPrefix(), resp.getText()); },
204  options);
205 
206  return reinterpret_cast<const RegisteredPrefixId*>(prefixToRegister.get());
207  }
208 
209  void
210  afterPrefixRegistered(shared_ptr<RegisteredPrefix> registeredPrefix,
211  const RegisterPrefixSuccessCallback& onSuccess)
212  {
213  m_registeredPrefixTable.insert(registeredPrefix);
214 
215  if (registeredPrefix->getFilter() != nullptr) {
216  // it was a combined operation
217  m_interestFilterTable.push_back(registeredPrefix->getFilter());
218  }
219 
220  if (onSuccess != nullptr) {
221  onSuccess(registeredPrefix->getPrefix());
222  }
223  }
224 
225  void
226  asyncUnregisterPrefix(const RegisteredPrefixId* registeredPrefixId,
227  const UnregisterPrefixSuccessCallback& onSuccess,
228  const UnregisterPrefixFailureCallback& onFailure)
229  {
230  auto i = std::find_if(m_registeredPrefixTable.begin(),
231  m_registeredPrefixTable.end(),
232  MatchRegisteredPrefixId(registeredPrefixId));
233  if (i != m_registeredPrefixTable.end()) {
234  RegisteredPrefix& record = **i;
235  const shared_ptr<InterestFilterRecord>& filter = record.getFilter();
236 
237  if (filter != nullptr) {
238  // it was a combined operation
239  m_interestFilterTable.remove(filter);
240  }
241 
242  nfd::ControlParameters params;
243  params.setName(record.getPrefix());
244  m_face.m_nfdController->start<nfd::RibUnregisterCommand>(
245  params,
246  [=] (const nfd::ControlParameters&) { this->finalizeUnregisterPrefix(i, onSuccess); },
247  [=] (const nfd::ControlResponse& resp) { onFailure(resp.getText()); },
248  record.getCommandOptions());
249  }
250  else {
251  if (onFailure != nullptr) {
252  onFailure("Unrecognized PrefixId");
253  }
254  }
255 
256  // there cannot be two registered prefixes with the same id
257  }
258 
259  void
261  const UnregisterPrefixSuccessCallback& onSuccess)
262  {
263  m_registeredPrefixTable.erase(item);
264 
265  if (onSuccess != nullptr) {
266  onSuccess();
267  }
268  }
269 
270 public: // IO routine
271  void
272  ensureConnected(bool wantResume)
273  {
274  if (!m_face.m_transport->isConnected())
275  m_face.m_transport->connect(m_face.m_ioService,
276  [=] (const Block& wire) { m_face.onReceiveElement(wire); });
277 
278  if (wantResume && !m_face.m_transport->isReceiving()) {
279  m_face.m_transport->resume();
280  }
281  }
282 
283  void
285  {
286  if (m_pendingInterestTable.empty() && m_registeredPrefixTable.empty()) {
287  m_face.m_transport->pause();
288  if (!m_ioServiceWork) {
289  m_processEventsTimeoutEvent.cancel();
290  }
291  }
292  }
293 
294 private:
295  Face& m_face;
296  util::Scheduler m_scheduler;
297  util::scheduler::ScopedEventId m_processEventsTimeoutEvent;
298 
299  PendingInterestTable m_pendingInterestTable;
300  InterestFilterTable m_interestFilterTable;
301  RegisteredPrefixTable m_registeredPrefixTable;
302 
303  unique_ptr<boost::asio::io_service::work> m_ioServiceWork; // if thread needs to be preserved
304 
305  friend class Face;
306 };
307 
308 } // namespace ndn
309 
310 #endif // NDN_DETAIL_FACE_IMPL_HPP
size_t wireEncode(EncodingImpl< TAG > &encoder) const
append packet to encoder
Definition: packet.cpp:42
const Name & getName() const
Definition: interest.hpp:226
Copyright (c) 2013-2016 Regents of the University of California.
Definition: common.hpp:74
void satisfyPendingInterests(const Data &data)
Definition: face-impl.hpp:120
function< void(const std::string &)> UnregisterPrefixFailureCallback
Callback invoked when unregisterPrefix or unsetInterestFilter command fails.
Definition: face.hpp:116
void processInterestFilters(Interest &interest)
Definition: face-impl.hpp:169
represents parameters in a ControlCommand request or response
implementation detail of Face
Definition: face-impl.hpp:51
Packet & add(const typename FIELD::ValueType &value)
add a FIELD with value
Definition: packet.hpp:156
void asyncExpressInterest(shared_ptr< const Interest > interest, const DataCallback &afterSatisfied, const NackCallback &afterNacked, const TimeoutCallback &afterTimeout)
Definition: face-impl.hpp:78
const Interest & getInterest() const
Definition: nack.hpp:53
ControlParameters & setFlags(uint64_t flags)
std::list< shared_ptr< InterestFilterRecord > > InterestFilterTable
Definition: face-impl.hpp:55
A container that emits onEmpty signal when it becomes empty.
const Name & getPrefix() const
Class representing a wire element of NDN-TLV packet format.
Definition: block.hpp:43
represents an Interest packet
Definition: interest.hpp:42
Functor to match PendingInterestId.
void asyncRemovePendingInterest(const PendingInterestId *pendingInterestId)
Definition: face-impl.hpp:108
const shared_ptr< InterestFilterRecord > & getFilter() const
stores information about a prefix registered in NDN forwarder
void asyncRemoveAllPendingInterests()
Definition: face-impl.hpp:114
Impl(Face &face)
Definition: face-impl.hpp:59
void asyncUnsetInterestFilter(const InterestFilterId *interestFilterId)
Definition: face-impl.hpp:158
represents a Network Nack
Definition: nack.hpp:40
provides a tag type for simple types
Definition: tag.hpp:58
void onEmptyPitOrNoRegisteredPrefixes()
Definition: face-impl.hpp:284
void finalizeUnregisterPrefix(RegisteredPrefixTable::iterator item, const UnregisterPrefixSuccessCallback &onSuccess)
Definition: face-impl.hpp:260
contains options for ControlCommand execution
std::pair< iterator, bool > insert(const value_type &value)
Provide a communication channel with local or remote NDN forwarder.
Definition: face.hpp:121
Functor to match InterestFilterId.
void ensureConnected(bool wantResume)
Definition: face-impl.hpp:272
function< void(const Name &, const std::string &)> RegisterPrefixFailureCallback
Callback invoked when registerPrefix or setInterestFilter command fails.
Definition: face.hpp:106
function< void(const Name &)> RegisterPrefixSuccessCallback
Callback invoked when registerPrefix or setInterestFilter command succeeds.
Definition: face.hpp:101
util::Signal< ContainerWithOnEmptySignal< T > > onEmpty
Signal to be fired when container becomes empty.
Name abstraction to represent an absolute name.
Definition: name.hpp:46
represents a rib/unregister command
boost::asio::io_service & getIoService()
Definition: face.hpp:696
represents a rib/register command
Event that is automatically cancelled upon destruction.
bool matchesInterest(const Interest &other) const
Check if Interest matches other interest.
Definition: interest.cpp:216
void cancel()
cancels the event manually
void nackPendingInterests(const lp::Nack &nack)
Definition: face-impl.hpp:135
void afterPrefixRegistered(shared_ptr< RegisteredPrefix > registeredPrefix, const RegisterPrefixSuccessCallback &onSuccess)
Definition: face-impl.hpp:210
void asyncSend(const Block &wire)
Definition: face-impl.hpp:179
ControlParameters & setName(const Name &name)
function< void()> UnregisterPrefixSuccessCallback
Callback invoked when unregisterPrefix or unsetInterestFilter command succeeds.
Definition: face.hpp:111
Functor to match RegisteredPrefixId.
ControlCommand response.
const nfd::CommandOptions & getCommandOptions() const
function< void(const Interest &)> TimeoutCallback
Callback invoked when expressed Interest times out.
Definition: face.hpp:73
ContainerWithOnEmptySignal< shared_ptr< RegisteredPrefix > > RegisteredPrefixTable
Definition: face-impl.hpp:56
function< void(const Interest &, const lp::Nack &)> NackCallback
Callback invoked when Nack is sent in response to expressed Interest.
Definition: face.hpp:68
represents a Data packet
Definition: data.hpp:37
void asyncUnregisterPrefix(const RegisteredPrefixId *registeredPrefixId, const UnregisterPrefixSuccessCallback &onSuccess, const UnregisterPrefixFailureCallback &onFailure)
Definition: face-impl.hpp:226
void asyncSetInterestFilter(shared_ptr< InterestFilterRecord > interestFilterRecord)
Definition: face-impl.hpp:152
ContainerWithOnEmptySignal< shared_ptr< PendingInterest > > PendingInterestTable
Definition: face-impl.hpp:54
function< void(const Interest &, const Data &)> DataCallback
Callback invoked when expressed Interest gets satisfied with a Data packet.
Definition: face.hpp:63
const RegisteredPrefixId * registerPrefix(const Name &prefix, shared_ptr< InterestFilterRecord > filter, const RegisterPrefixSuccessCallback &onSuccess, const RegisterPrefixFailureCallback &onFailure, uint64_t flags, const nfd::CommandOptions &options)
Definition: face-impl.hpp:187