36 , m_linkService(linkService)
37 , m_firstUnackedFrag(m_unackedFrags.begin())
39 , m_isIdleAckTimerRunning(false)
41 BOOST_ASSERT(m_linkService !=
nullptr);
51 this->stopIdleAckTimer();
68 auto unackedFragsIt = m_unackedFrags.begin();
69 auto sendTime = time::steady_clock::now();
71 auto netPkt = make_shared<NetPkt>(std::move(pkt), isInterest);
72 netPkt->unackedFrags.reserve(frags.size());
74 for (lp::Packet& frag : frags) {
76 lp::Sequence txSeq = assignTxSequence(frag);
79 unackedFragsIt = m_unackedFrags.emplace_hint(unackedFragsIt,
80 std::piecewise_construct,
81 std::forward_as_tuple(txSeq),
82 std::forward_as_tuple(frag));
83 unackedFragsIt->second.sendTime = sendTime;
85 [=] { onLpPacketLost(txSeq); });
86 unackedFragsIt->second.netPkt = netPkt;
88 if (m_unackedFrags.size() == 1) {
89 m_firstUnackedFrag = m_unackedFrags.begin();
93 netPkt->unackedFrags.push_back(unackedFragsIt);
102 auto now = time::steady_clock::now();
105 for (lp::Sequence ackSeq : pkt.list<lp::AckField>()) {
106 auto fragIt = m_unackedFrags.find(ackSeq);
107 if (fragIt == m_unackedFrags.end()) {
111 auto& frag = fragIt->second;
114 frag.rtoTimer.cancel();
116 if (frag.retxCount == 0) {
118 m_rto.
addMeasurement(time::duration_cast<RttEstimator::Duration>(now - frag.sendTime));
124 auto lostLpPackets = findLostLpPackets(fragIt);
128 onLpPacketAcknowledged(fragIt);
134 std::set<lp::Sequence> removedLpPackets;
137 for (lp::Sequence txSeq : lostLpPackets) {
138 if (removedLpPackets.find(txSeq) == removedLpPackets.end()) {
139 auto removedThisTxSeq = this->onLpPacketLost(txSeq);
140 for (
auto removedTxSeq : removedThisTxSeq) {
141 removedLpPackets.insert(removedTxSeq);
148 if (pkt.has<lp::FragmentField>() && pkt.has<lp::TxSequenceField>()) {
149 m_ackQueue.push(pkt.get<lp::TxSequenceField>());
150 if (!m_isIdleAckTimerRunning) {
151 this->startIdleAckTimer();
160 BOOST_ASSERT(pkt.wireEncode().type() == lp::tlv::LpPacket);
163 ssize_t pktSize = pkt.wireEncode().size();
164 ssize_t reservedSpace = tlv::sizeOfVarNumber(ndn::MAX_NDN_PACKET_SIZE) -
165 tlv::sizeOfVarNumber(pktSize);
166 ssize_t remainingSpace = (mtu ==
MTU_UNLIMITED ? ndn::MAX_NDN_PACKET_SIZE : mtu) - reservedSpace;
167 remainingSpace -= pktSize;
169 while (!m_ackQueue.empty()) {
170 lp::Sequence ackSeq = m_ackQueue.front();
172 const ssize_t ackSize = tlv::sizeOfVarNumber(lp::tlv::Ack) +
173 tlv::sizeOfVarNumber(
sizeof(lp::Sequence)) +
174 sizeof(lp::Sequence);
176 if (ackSize > remainingSpace) {
180 pkt.add<lp::AckField>(ackSeq);
182 remainingSpace -= ackSize;
187 LpReliability::assignTxSequence(lp::Packet& frag)
189 lp::Sequence txSeq = ++m_lastTxSeqNo;
190 frag.set<lp::TxSequenceField>(txSeq);
191 if (m_unackedFrags.size() > 0 && m_lastTxSeqNo == m_firstUnackedFrag->first) {
192 NDN_THROW(std::length_error(
"TxSequence range exceeded"));
194 return m_lastTxSeqNo;
198 LpReliability::startIdleAckTimer()
200 BOOST_ASSERT(!m_isIdleAckTimerRunning);
201 m_isIdleAckTimerRunning =
true;
204 while (!m_ackQueue.empty()) {
205 m_linkService->requestIdlePacket();
208 m_isIdleAckTimerRunning =
false;
213 LpReliability::stopIdleAckTimer()
215 m_idleAckTimer.cancel();
216 m_isIdleAckTimerRunning =
false;
219 std::vector<lp::Sequence>
222 std::vector<lp::Sequence> lostLpPackets;
224 for (
auto it = m_firstUnackedFrag; ; ++it) {
225 if (it == m_unackedFrags.end()) {
226 it = m_unackedFrags.begin();
229 if (it->first == ackIt->first) {
233 auto& unackedFrag = it->second;
234 unackedFrag.nGreaterSeqAcks++;
237 lostLpPackets.push_back(it->first);
241 return lostLpPackets;
244 std::vector<lp::Sequence>
245 LpReliability::onLpPacketLost(lp::Sequence txSeq)
247 BOOST_ASSERT(m_unackedFrags.count(txSeq) > 0);
248 auto txSeqIt = m_unackedFrags.find(txSeq);
250 auto& txFrag = txSeqIt->second;
251 txFrag.rtoTimer.cancel();
252 auto netPkt = txFrag.netPkt;
253 std::vector<lp::Sequence> removedThisTxSeq;
256 if (txFrag.retxCount >= m_options.
maxRetx) {
258 for (
size_t i = 0; i < netPkt->unackedFrags.size(); i++) {
259 if (netPkt->unackedFrags[i] != txSeqIt) {
260 removedThisTxSeq.push_back(netPkt->unackedFrags[i]->first);
261 deleteUnackedFrag(netPkt->unackedFrags[i]);
268 if (netPkt->isInterest) {
269 BOOST_ASSERT(netPkt->pkt.has<lp::FragmentField>());
270 ndn::Buffer::const_iterator fragBegin, fragEnd;
271 std::tie(fragBegin, fragEnd) = netPkt->pkt.get<lp::FragmentField>();
272 Block frag(&*fragBegin, std::distance(fragBegin, fragEnd));
276 removedThisTxSeq.push_back(txSeqIt->first);
277 deleteUnackedFrag(txSeqIt);
281 lp::Sequence newTxSeq = assignTxSequence(txFrag.pkt);
282 netPkt->didRetx =
true;
285 auto newTxFragIt = m_unackedFrags.emplace_hint(
286 m_firstUnackedFrag != m_unackedFrags.end() && m_firstUnackedFrag->first > newTxSeq
288 : m_unackedFrags.end(),
289 std::piecewise_construct,
290 std::forward_as_tuple(newTxSeq),
291 std::forward_as_tuple(txFrag.pkt));
292 auto& newTxFrag = newTxFragIt->second;
293 newTxFrag.retxCount = txFrag.retxCount + 1;
294 newTxFrag.netPkt = netPkt;
297 auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), txSeqIt);
298 BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
299 *fragInNetPkt = newTxFragIt;
301 removedThisTxSeq.push_back(txSeqIt->first);
302 deleteUnackedFrag(txSeqIt);
305 m_linkService->sendLpPacket(lp::Packet(newTxFrag.pkt));
311 return removedThisTxSeq;
317 auto netPkt = fragIt->second.netPkt;
320 auto fragInNetPkt = std::find(netPkt->unackedFrags.begin(), netPkt->unackedFrags.end(), fragIt);
321 BOOST_ASSERT(fragInNetPkt != netPkt->unackedFrags.end());
322 *fragInNetPkt = netPkt->unackedFrags.back();
323 netPkt->unackedFrags.pop_back();
326 if (netPkt->unackedFrags.empty()) {
327 if (netPkt->didRetx) {
335 deleteUnackedFrag(fragIt);
341 lp::Sequence firstUnackedTxSeq = m_firstUnackedFrag->first;
342 lp::Sequence currentTxSeq = fragIt->first;
343 auto nextFragIt = m_unackedFrags.erase(fragIt);
345 if (!m_unackedFrags.empty() && firstUnackedTxSeq == currentTxSeq) {
347 if (nextFragIt == m_unackedFrags.end()) {
348 m_firstUnackedFrag = m_unackedFrags.begin();
351 m_firstUnackedFrag = nextFragIt;
354 else if (m_unackedFrags.empty()) {
355 m_firstUnackedFrag = m_unackedFrags.end();
359 LpReliability::UnackedFrag::UnackedFrag(lp::Packet pkt)
360 : pkt(std::move(pkt))
361 , sendTime(time::steady_clock::now())
367 LpReliability::NetPkt::NetPkt(lp::Packet&& pkt,
bool isInterest)
368 : pkt(std::move(pkt))
369 , isInterest(isInterest)
void processIncomingPacket(const lp::Packet &pkt)
extract and parse all Acks and add Ack for contained Fragment (if any) to AckQueue ...
const GenericLinkService * getLinkService() const
const ssize_t MTU_UNLIMITED
indicates the transport has no limit on payload size
void piggyback(lp::Packet &pkt, ssize_t mtu)
called by GenericLinkService to attach Acks onto an outgoing LpPacket
bool isEnabled
enables link-layer reliability
PacketCounter nAcknowledged
count of network-layer packets that did not require retransmission of a fragment
GenericLinkService is a LinkService that implements the NDNLPv2 protocol.
size_t maxRetx
maximum number of retransmissions for an LpPacket
Scheduler & getScheduler()
Returns the global Scheduler instance for the calling thread.
Table::const_iterator iterator
size_t seqNumLossThreshold
a fragment is considered lost if this number of fragments with greater sequence numbers are acknowled...
Copyright (c) 2014-2015, Regents of the University of California, Arizona Board of Regents...
LpReliability(const Options &options, GenericLinkService *linkService)
void handleOutgoing(std::vector< lp::Packet > &frags, lp::Packet &&pkt, bool isInterest)
observe outgoing fragment(s) of a network packet and store for potential retransmission ...
time::nanoseconds idleAckTimerPeriod
period between sending pending Acks in an IDLE packet
void setOptions(const Options &options)
set options for reliability
signal::Signal< LpReliability, Interest > onDroppedInterest
signals on Interest dropped by reliability system for exceeding allowed number of retx ...
PacketCounter nRetxExhausted
count of network-layer packets dropped because a fragment reached the maximum number of retransmissio...
Duration computeRto() const
void addMeasurement(Duration measure)
PacketCounter nRetransmitted
count of network-layer packets that had at least one fragment retransmitted, but were eventually rece...