28 #include <ndn-cxx/lp/pit-token.hpp>
29 #include <ndn-cxx/lp/tags.hpp>
38 tlv::sizeOfVarNumber(
sizeof(uint64_t)) +
39 tlv::sizeOfNonNegativeInteger(UINT64_MAX);
43 , m_fragmenter(m_options.fragmenterOptions, this)
44 , m_reassembler(m_options.reassemblerOptions, this)
45 , m_reliability(m_options.reliabilityOptions, this)
47 , m_nextMarkTime(time::steady_clock::time_point::max())
48 , m_nMarkedSinceInMarkingState(0)
84 GenericLinkService::requestIdlePacket()
89 this->sendLpPacket({});
93 GenericLinkService::sendLpPacket(lp::Packet&& pkt)
102 checkCongestionLevel(pkt);
105 auto block = pkt.wireEncode();
106 if (mtu !=
MTU_UNLIMITED && block.size() >
static_cast<size_t>(mtu)) {
115 GenericLinkService::doSendInterest(
const Interest& interest)
117 lp::Packet lpPacket(interest.wireEncode());
119 encodeLpFields(interest, lpPacket);
121 this->sendNetPacket(std::move(lpPacket),
true);
125 GenericLinkService::doSendData(
const Data& data)
127 lp::Packet lpPacket(data.wireEncode());
129 encodeLpFields(data, lpPacket);
131 this->sendNetPacket(std::move(lpPacket),
false);
135 GenericLinkService::doSendNack(
const lp::Nack& nack)
137 lp::Packet lpPacket(nack.getInterest().wireEncode());
138 lpPacket.add<lp::NackField>(nack.getHeader());
140 encodeLpFields(nack, lpPacket);
142 this->sendNetPacket(std::move(lpPacket),
false);
146 GenericLinkService::assignSequences(std::vector<lp::Packet>& pkts)
148 std::for_each(pkts.begin(), pkts.end(), [
this] (lp::Packet& pkt) {
149 pkt.set<lp::SequenceField>(++m_lastSeqNo);
154 GenericLinkService::encodeLpFields(
const ndn::PacketBase& netPkt, lp::Packet& lpPacket)
157 auto incomingFaceIdTag = netPkt.getTag<lp::IncomingFaceIdTag>();
158 if (incomingFaceIdTag !=
nullptr) {
159 lpPacket.add<lp::IncomingFaceIdField>(*incomingFaceIdTag);
163 auto congestionMarkTag = netPkt.getTag<lp::CongestionMarkTag>();
164 if (congestionMarkTag !=
nullptr) {
165 lpPacket.add<lp::CongestionMarkField>(*congestionMarkTag);
169 auto nonDiscoveryTag = netPkt.getTag<lp::NonDiscoveryTag>();
170 if (nonDiscoveryTag !=
nullptr) {
171 lpPacket.add<lp::NonDiscoveryField>(*nonDiscoveryTag);
174 auto prefixAnnouncementTag = netPkt.getTag<lp::PrefixAnnouncementTag>();
175 if (prefixAnnouncementTag !=
nullptr) {
176 lpPacket.add<lp::PrefixAnnouncementField>(*prefixAnnouncementTag);
180 auto pitToken = netPkt.getTag<lp::PitToken>();
181 if (pitToken !=
nullptr) {
182 lpPacket.add<lp::PitTokenField>(*pitToken);
187 GenericLinkService::sendNetPacket(lp::Packet&& pkt,
bool isInterest)
189 std::vector<lp::Packet> frags;
215 frags.push_back(pkt);
218 frags.push_back(std::move(pkt));
222 if (frags.size() == 1) {
225 BOOST_ASSERT(!frags.front().has<lp::FragIndexField>());
226 BOOST_ASSERT(!frags.front().has<lp::FragCountField>());
232 this->assignSequences(frags);
239 for (lp::Packet& frag : frags) {
240 this->sendLpPacket(std::move(frag));
245 GenericLinkService::checkCongestionLevel(lp::Packet& pkt)
249 if (sendQueueLength < 0) {
253 if (sendQueueLength > 0) {
261 const auto now = time::steady_clock::now();
263 if (m_nextMarkTime == time::steady_clock::time_point::max()) {
267 else if (now >= m_nextMarkTime) {
268 pkt.set<lp::CongestionMarkField>(1);
272 ++m_nMarkedSinceInMarkingState;
275 time::nanoseconds interval(
static_cast<time::nanoseconds::rep
>(
277 std::sqrt(m_nMarkedSinceInMarkingState + 1)));
278 m_nextMarkTime += interval;
281 else if (m_nextMarkTime != time::steady_clock::time_point::max()) {
284 m_nextMarkTime = time::steady_clock::time_point::max();
285 m_nMarkedSinceInMarkingState = 0;
290 GenericLinkService::doReceivePacket(
const Block& packet,
const EndpointId& endpoint)
293 lp::Packet pkt(packet);
303 if (!pkt.has<lp::FragmentField>()) {
308 if ((pkt.has<lp::FragIndexField>() || pkt.has<lp::FragCountField>()) &&
314 auto [isReassembled, netPkt, firstPkt] = m_reassembler.
receiveFragment(endpoint, pkt);
316 this->decodeNetPacket(netPkt, firstPkt, endpoint);
319 catch (
const tlv::Error& e) {
326 GenericLinkService::decodeNetPacket(
const Block& netPkt,
const lp::Packet& firstPkt,
330 switch (netPkt.type()) {
332 if (firstPkt.has<lp::NackField>()) {
333 this->decodeNack(netPkt, firstPkt, endpointId);
336 this->decodeInterest(netPkt, firstPkt, endpointId);
340 this->decodeData(netPkt, firstPkt, endpointId);
344 NFD_LOG_FACE_WARN(
"unrecognized network-layer packet TLV-TYPE " << netPkt.type() <<
": DROP");
348 catch (
const tlv::Error& e) {
355 GenericLinkService::decodeInterest(
const Block& netPkt,
const lp::Packet& firstPkt,
358 BOOST_ASSERT(netPkt.type() == tlv::Interest);
359 BOOST_ASSERT(!firstPkt.has<lp::NackField>());
362 auto interest = make_shared<Interest>(netPkt);
364 if (firstPkt.has<lp::NextHopFaceIdField>()) {
366 interest->setTag(make_shared<lp::NextHopFaceIdTag>(firstPkt.get<lp::NextHopFaceIdField>()));
374 if (firstPkt.has<lp::CachePolicyField>()) {
380 if (firstPkt.has<lp::IncomingFaceIdField>()) {
384 if (firstPkt.has<lp::CongestionMarkField>()) {
385 interest->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
388 if (firstPkt.has<lp::NonDiscoveryField>()) {
390 interest->setTag(make_shared<lp::NonDiscoveryTag>(firstPkt.get<lp::NonDiscoveryField>()));
397 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
403 if (firstPkt.has<lp::PitTokenField>()) {
404 interest->setTag(make_shared<lp::PitToken>(firstPkt.get<lp::PitTokenField>()));
411 GenericLinkService::decodeData(
const Block& netPkt,
const lp::Packet& firstPkt,
414 BOOST_ASSERT(netPkt.type() == tlv::Data);
417 auto data = make_shared<Data>(netPkt);
419 if (firstPkt.has<lp::NackField>()) {
425 if (firstPkt.has<lp::NextHopFaceIdField>()) {
431 if (firstPkt.has<lp::CachePolicyField>()) {
435 data->setTag(make_shared<lp::CachePolicyTag>(firstPkt.get<lp::CachePolicyField>()));
438 if (firstPkt.has<lp::IncomingFaceIdField>()) {
442 if (firstPkt.has<lp::CongestionMarkField>()) {
443 data->setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
446 if (firstPkt.has<lp::NonDiscoveryField>()) {
452 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
454 data->setTag(make_shared<lp::PrefixAnnouncementTag>(firstPkt.get<lp::PrefixAnnouncementField>()));
457 NFD_LOG_FACE_WARN(
"received PrefixAnnouncement, but self-learning disabled: IGNORE");
465 GenericLinkService::decodeNack(
const Block& netPkt,
const lp::Packet& firstPkt,
468 BOOST_ASSERT(netPkt.type() == tlv::Interest);
469 BOOST_ASSERT(firstPkt.has<lp::NackField>());
471 lp::Nack nack((Interest(netPkt)));
472 nack.setHeader(firstPkt.get<lp::NackField>());
474 if (firstPkt.has<lp::NextHopFaceIdField>()) {
480 if (firstPkt.has<lp::CachePolicyField>()) {
486 if (firstPkt.has<lp::IncomingFaceIdField>()) {
490 if (firstPkt.has<lp::CongestionMarkField>()) {
491 nack.setTag(make_shared<lp::CongestionMarkTag>(firstPkt.get<lp::CongestionMarkField>()));
494 if (firstPkt.has<lp::NonDiscoveryField>()) {
500 if (firstPkt.has<lp::PrefixAnnouncementField>()) {
Options that control the behavior of GenericLinkService.
LpReassembler::Options reassemblerOptions
Options for reassembly.
LpFragmenter::Options fragmenterOptions
Options for fragmentation.
bool allowReassembly
Enables reassembly.
bool allowSelfLearning
Enables self-learning forwarding support.
LpReliability::Options reliabilityOptions
Options for reliability.
bool allowCongestionMarking
Enables send queue congestion detection and marking.
bool allowFragmentation
Enables fragmentation.
ssize_t overrideMtu
Overrides the MTU provided by Transport.
size_t defaultCongestionThreshold
Default congestion threshold in bytes.
time::nanoseconds baseCongestionMarkingInterval
Starting value for congestion marking interval.
bool allowLocalFields
Enables encoding of IncomingFaceId, and decoding of NextHopFaceId and CachePolicy.
PacketCounter nInNetInvalid
Count of invalid reassembled network-layer packets dropped.
PacketCounter nDuplicateSequence
Count of LpPackets dropped due to duplicate Sequence numbers.
SizeCounter< LpReassembler > nReassembling
Count of network-layer packets currently being reassembled.
PacketCounter nFragmentationErrors
Count of failed fragmentations.
PacketCounter nCongestionMarked
Count of outgoing LpPackets that were marked with congestion marks.
PacketCounter nOutOverMtu
Count of outgoing LpPackets dropped due to exceeding MTU limit.
PacketCounter nInLpInvalid
Count of invalid LpPackets dropped before reassembly.
PacketCounter nReassemblyTimeouts
Count of dropped partial network-layer packets due to reassembly timeout.
ssize_t getEffectiveMtu() const final
void setOptions(const Options &options)
Sets the options used by GenericLinkService.
GenericLinkService(const Options &options={})
bool canOverrideMtuTo(ssize_t mtu) const
Whether MTU can be overridden to the specified value.
void receiveNack(const lp::Nack &nack, const EndpointId &endpoint)
Delivers received Nack to forwarding.
void receiveInterest(const Interest &interest, const EndpointId &endpoint)
Delivers received Interest to forwarding.
void notifyDroppedInterest(const Interest &packet)
void receiveData(const Data &data, const EndpointId &endpoint)
Delivers received Data to forwarding.
const Transport * getTransport() const noexcept
Returns the Transport to which this LinkService is attached.
void sendPacket(const Block &packet)
Send a lower-layer packet via Transport.
void setOptions(const Options &options)
Set options for fragmenter.
std::tuple< bool, std::vector< lp::Packet > > fragmentPacket(const lp::Packet &packet, size_t mtu)
Fragments a network-layer packet into link-layer packets.
std::tuple< bool, Block, lp::Packet > receiveFragment(const EndpointId &remoteEndpoint, const lp::Packet &packet)
Adds received fragment to the buffer.
signal::Signal< LpReassembler, EndpointId, size_t > beforeTimeout
Notifies before a partial packet is dropped due to timeout.
void setOptions(const Options &options)
Set options for reassembler.
signal::Signal< LpReliability, Interest > onDroppedInterest
Signals on Interest dropped by reliability system for exceeding allowed number of retx.
void piggyback(lp::Packet &pkt, ssize_t mtu)
Called by GenericLinkService to attach Acks onto an outgoing LpPacket.
static constexpr size_t RESERVED_HEADER_SPACE
TxSequence TLV-TYPE (3 octets) + TLV-LENGTH (1 octet) + lp::Sequence (8 octets)
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
Observe outgoing fragment(s) of a network packet and store for potential retransmission.
bool processIncomingPacket(const lp::Packet &pkt)
Extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue.
void setOptions(const Options &options)
Set options for reliability.
virtual ssize_t getSendQueueLength()
Returns the current send queue length of the transport (in octets).
#define NFD_LOG_FACE_DEBUG(msg)
Log a message at DEBUG level.
#define NFD_LOG_FACE_WARN(msg)
Log a message at WARN level.
#define NFD_LOG_FACE_TRACE(msg)
Log a message at TRACE level.
#define NFD_LOG_INIT(name)
std::variant< std::monostate, ethernet::Address, udp::Endpoint > EndpointId
Identifies a remote endpoint on the link.
constexpr size_t CONGESTION_MARK_SIZE
constexpr ssize_t MIN_MTU
Minimum MTU that may be set.
constexpr ssize_t MTU_UNLIMITED
Indicates that the transport has no limit on payload size.
bool isEnabled
Enables link-layer reliability.