28 #include <ndn-cxx/lp/tags.hpp> 40 : allowLocalFields(false)
41 , allowFragmentation(false)
42 , allowReassembly(false)
43 , allowCongestionMarking(false)
44 , baseCongestionMarkingInterval(time::milliseconds(100))
45 , defaultCongestionThreshold(65536)
46 , allowSelfLearning(false)
56 , m_nextMarkTime(time::steady_clock::TimePoint::max())
57 , m_lastMarkTime(time::steady_clock::TimePoint::min())
58 , m_nMarkedSinceInMarkingState(0)
75 GenericLinkService::requestIdlePacket()
79 this->sendLpPacket({});
83 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
92 checkCongestionLevel(pkt);
96 if (mtu !=
MTU_UNLIMITED && tp.packet.size() >
static_cast<size_t>(mtu)) {
105 GenericLinkService::doSendInterest(
const Interest& interest)
107 lp::Packet lpPacket(interest.wireEncode());
109 encodeLpFields(interest, lpPacket);
111 this->sendNetPacket(std::move(lpPacket),
true);
115 GenericLinkService::doSendData(
const Data& data)
117 lp::Packet lpPacket(data.wireEncode());
119 encodeLpFields(data, lpPacket);
121 this->sendNetPacket(std::move(lpPacket),
false);
125 GenericLinkService::doSendNack(
const lp::Nack& nack)
127 lp::Packet lpPacket(nack.getInterest().wireEncode());
128 lpPacket.add<lp::NackField>(nack.getHeader());
130 encodeLpFields(nack, lpPacket);
132 this->sendNetPacket(std::move(lpPacket),
false);
136 GenericLinkService::encodeLpFields(
const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
139 shared_ptr<lp::IncomingFaceIdTag> incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
140 if (incomingFaceIdTag !=
nullptr) {
141 lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
145 shared_ptr<lp::CongestionMarkTag> congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
146 if (congestionMarkTag !=
nullptr) {
147 lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
151 shared_ptr<lp::NonDiscoveryTag> nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
152 if (nonDiscoveryTag !=
nullptr) {
153 lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
156 shared_ptr<lp::PrefixAnnouncementTag> prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
157 if (prefixAnnouncementTag !=
nullptr) {
158 lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
164 GenericLinkService::sendNetPacket(lp::Packet&& pkt,
bool isInterest)
166 std::vector<lp::Packet> frags;
175 mtu -= CONGESTION_MARK_SIZE;
191 frags.push_back(pkt);
194 frags.push_back(std::move(pkt));
198 if (frags.size() == 1) {
201 BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
202 BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
206 if (frags.size() > 1) {
208 this->assignSequences(frags);
215 for (lp::Packet& frag : frags) {
216 this->sendLpPacket(std::move(frag));
221 GenericLinkService::assignSequence(lp::Packet& pkt)
223 pkt.set<lp::SequenceField>(++m_lastSeqNo);
227 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
229 std::for_each(pkts.begin(), pkts.end(), bind(&GenericLinkService::assignSequence,
this, _1));
233 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
237 if (sendQueueLength < 0) {
245 congestionThreshold = std::min(congestionThreshold,
246 static_cast<size_t>(
getTransport()->getSendQueueCapacity()) /
247 DEFAULT_CONGESTION_THRESHOLD_DIVISOR);
250 if (sendQueueLength > 0) {
251 NFD_LOG_FACE_TRACE(
"txqlen=" << sendQueueLength <<
" threshold=" << congestionThreshold <<
252 " capacity=" <<
getTransport()->getSendQueueCapacity());
255 if (static_cast<size_t>(sendQueueLength) > congestionThreshold) {
256 const auto now = time::steady_clock::now();
259 if (m_nMarkedSinceInMarkingState == 0) {
260 m_nextMarkTime = now;
264 pkt.set<lp::CongestionMarkField>(1);
268 ++m_nMarkedSinceInMarkingState;
271 m_nextMarkTime += time::nanoseconds(static_cast<time::nanoseconds::rep>(
273 std::sqrt(m_nMarkedSinceInMarkingState)));
274 m_lastMarkTime = now;
277 else if (m_nextMarkTime != time::steady_clock::TimePoint::max()) {
280 m_nextMarkTime = time::steady_clock::TimePoint::max();
281 m_nMarkedSinceInMarkingState = 0;
289 lp::Packet pkt(packet.packet);
295 if (!pkt.has<lp::FragmentField>()) {
300 if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
306 bool isReassembled =
false;
309 std::tie(isReassembled, netPkt, firstPkt) = m_reassembler.
receiveFragment(packet.remoteEndpoint,
312 this->decodeNetPacket(netPkt, firstPkt);
315 catch (
const tlv::Error& e) {
322 GenericLinkService::decodeNetPacket(
const Block& netPkt,
const lp::Packet& firstPkt)
325 switch (netPkt.type()) {
327 if (firstPkt.has<lp::NackField>()) {
328 this->decodeNack(netPkt, firstPkt);
331 this->decodeInterest(netPkt, firstPkt);
335 this->decodeData(netPkt, firstPkt);
339 NFD_LOG_FACE_WARN(
"unrecognized network-layer packet TLV-TYPE " << netPkt.type() <<
": DROP");
343 catch (
const tlv::Error& e) {
350 GenericLinkService::decodeInterest(
const Block& netPkt,
const lp::Packet& firstPkt)
352 BOOST_ASSERT(netPkt.type() == tlv::Interest);
353 BOOST_ASSERT(!firstPkt.has<lp::NackField>());
356 auto interest = make_shared<Interest>(netPkt);
358 if (firstPkt.has<lp::NextHopFaceIdField>()) {
360 interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
368 if (firstPkt.has<lp::CachePolicyField>()) {
374 if (firstPkt.has<lp::IncomingFaceIdField>()) {
378 if (firstPkt.has<lp::CongestionMarkField>()) {
379 interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
382 if (firstPkt.has<lp::NonDiscoveryField>()) {
384 interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
391 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
401 GenericLinkService::decodeData(
const Block& netPkt,
const lp::Packet& firstPkt)
403 BOOST_ASSERT(netPkt.type() == tlv::Data);
406 auto data = make_shared<Data>(netPkt);
408 if (firstPkt.has<lp::NackField>()) {
414 if (firstPkt.has<lp::NextHopFaceIdField>()) {
420 if (firstPkt.has<lp::CachePolicyField>()) {
424 data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
427 if (firstPkt.has<lp::IncomingFaceIdField>()) {
431 if (firstPkt.has<lp::CongestionMarkField>()) {
432 data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
435 if (firstPkt.has<lp::NonDiscoveryField>()) {
441 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
443 data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
446 NFD_LOG_FACE_WARN(
"received PrefixAnnouncement, but self-learning disabled: IGNORE");
454 GenericLinkService::decodeNack(
const Block& netPkt,
const lp::Packet& firstPkt)
456 BOOST_ASSERT(netPkt.type() == tlv::Interest);
457 BOOST_ASSERT(firstPkt.has<lp::NackField>());
459 lp::Nack nack((Interest(netPkt)));
460 nack.setHeader(firstPkt.get<lp::NackField>());
462 if (firstPkt.has<lp::NextHopFaceIdField>()) {
468 if (firstPkt.has<lp::CachePolicyField>()) {
474 if (firstPkt.has<lp::IncomingFaceIdField>()) {
478 if (firstPkt.has<lp::CongestionMarkField>()) {
479 nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
482 if (firstPkt.has<lp::NonDiscoveryField>()) {
488 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
void setOptions(const Options &options)
sets Options used by GenericLinkService
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
void receiveData(const Data &data)
delivers received Data to forwarding
SizeCounter< LpReassembler > nReassembling
count of network-layer packets currently being reassembled
PacketCounter nInLpInvalid
count of invalid LpPackets dropped before reassembly
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
PacketCounter nFragmentationErrors
count of failed fragmentations
bool allowFragmentation
enables fragmentation
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
bool isEnabled
enables link-layer reliability
void setOptions(const Options &options)
set options for fragmenter
PacketCounter nInNetInvalid
count of invalid reassembled network-layer packets dropped
stores a packet along with the remote endpoint
LpReassembler::Options reassemblerOptions
options for reassembly
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TxSequence TLV-LENGTH (1 octet) + sizeof(lp::Sequence) ...
bool allowSelfLearning
enables self-learning forwarding support
const Transport * getTransport() const
PacketCounter nCongestionMarked
count of outgoing LpPackets that were marked with congestion marks
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
signal::Signal< LpReassembler, Transport::EndpointId, size_t > beforeTimeout
signals before a partial packet is dropped due to timeout
void sendPacket(Transport::Packet &&packet)
sends a lower-layer packet via Transport
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
fragments a network-layer packet into link-layer packets
constexpr uint32_t DEFAULT_CONGESTION_THRESHOLD_DIVISOR
void receiveNack(const lp::Nack &nack)
delivers received Nack to forwarding
virtual ssize_t getSendQueueLength()
PacketCounter nOutOverMtu
count of outgoing LpPackets dropped due to exceeding MTU limit
LpReliability::Options reliabilityOptions
options for reliability
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
Options that control the behavior of GenericLinkService.
void setOptions(const Options &options)
set options for reliability
GenericLinkService(const Options &options=Options())
#define NFD_LOG_INIT(name)
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
std::tuple< bool, Block, lp::Packet > receiveFragment(Transport::EndpointId remoteEndpoint, const lp::Packet &packet)
adds received fragment to buffer
time::nanoseconds baseCongestionMarkingInterval
starting value for congestion marking interval
size_t defaultCongestionThreshold
default congestion threshold in bytes
void notifyDroppedInterest(const Interest &packet)
bool allowLocalFields
enables encoding of IncomingFaceId, and decoding of NextHopFaceId and CachePolicy ...
bool allowCongestionMarking
enables send queue congestion detection and marking
void receiveInterest(const Interest &interest)
delivers received Interest to forwarding
bool allowReassembly
enables reassembly
PacketCounter nReassemblyTimeouts
count of dropped partial network-layer packets due to reassembly timeout
void setOptions(const Options &options)
set options for reassembler
LpFragmenter::Options fragmenterOptions
options for fragmentation