实现 TCP 协议中的发送器 TCPSender
。
TCP 通过累积确认的方式来标记哪些段不需要跟踪了,发送方收到了一个 ackno
,表示接收方已经接收了 ackno
之前的所有数据。可以使用 queue
来跟踪没被确认的,一旦确认了就出队,同时也方便重传数据。
累积确认类似与 GBN(go back N)协议。在接收方 GBN 机制选择将待确认的全部丢弃了,在这次实验中 (Lab2), 选择了缓存这些待确认的,这类似与 SR(Selective Repeat)协议。
定时器机制
- 一段报文被发送时,定时器没有开启就开启,开启了就什么都不做。
- 定时器超时,这是就要重传还没有被确认的序号最小的段,同时,超时时间限制要扩大两倍,定时器重新计时。
- 收到一个合法的
ackno
时(严格大于已经被确认的最大的序号),定时器的超时时间限制回到初始值,并且不再跟踪已经确认的段,如果还有跟踪的段,开启定时器。
如果一个段的一部分被确认了,这个段仍然被认为是没有被确认的,事实上,TCP的实现上可以认为它是被确认的。作为实验,这里简化了操作。
需要注意的是,第一次握手时发送的段只包含 SYN
不包含其他数据。
在 Lab3 中 我们有一个发送空段的函数 send_empty_segment
可以用来探测接收方容量窗口大小,但是在 Lab2 中,并没处理接收一个空段会怎样。
tcp_receiver.cc
1 2 3 4 5 6 7 8 9 10
| bool TCPReceiver::segment_received(const TCPSegment &seg) { const uint64_t abs_seqno = unwrap(seg.header().seqno, _isn.value(), _pos); const uint64_t index = abs_seqno + (issyn ? 1 : 0);
const bool received_empty_segment = abs_seqno == _pos && seg.length_in_sequence_space() == 0; if (received_empty_segment) return true; }
|
tcp_sender.hh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class TCPSender { private: uint64_t _recv_ackno{0};
bool _syn_flag{false}; bool _fin_flag{false};
uint64_t _windows_size{0}; std::queue<TCPSegment> _segments_outstanding{};
uint64_t _timer{0}; uint64_t _timeout; bool _time_running{false};
uint64_t _byte_in_flight{0}; uint64_t _consecutive_retransmissions{0};
|
tcp_sender.cc
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
| TCPSender::TCPSender(const size_t capacity, const uint16_t retx_timeout, const std::optional<WrappingInt32> fixed_isn) : _isn(fixed_isn.value_or(WrappingInt32{random_device()()})) , _initial_retransmission_timeout{retx_timeout} , _stream(capacity) , _timeout{retx_timeout} {}
uint64_t TCPSender::bytes_in_flight() const { return _byte_in_flight; }
void TCPSender::fill_window() { if (!_syn_flag) { TCPSegment seg{}; seg.header().syn = true; send_segment(seg); _syn_flag = true; return; }
const size_t win_size = _windows_size ? _windows_size : 1; size_t remain_win{}; while ((remain_win = win_size - _next_seqno + _recv_ackno) > 0 && !_fin_flag) { TCPSegment seg{}; const uint16_t length = std::min(TCPConfig::MAX_PAYLOAD_SIZE, remain_win); std::string data = _stream.read(length); seg.payload() = Buffer{std::move(data)};
const bool eof = _stream.eof() && seg.length_in_sequence_space() < win_size; if (eof) { seg.header().fin = true; _fin_flag = true; }
const bool empty_segment = seg.length_in_sequence_space() == 0; if (empty_segment) return; send_segment(seg); } }
bool TCPSender::ack_received(const WrappingInt32 ackno, const uint16_t window_size) { const uint64_t abs_seqno = unwrap(ackno, _isn, _recv_ackno);
if (abs_seqno > _next_seqno) return false; _windows_size = window_size;
if (abs_seqno <= _recv_ackno) return true; _recv_ackno = abs_seqno;
_timeout = _initial_retransmission_timeout; _consecutive_retransmissions = 0;
while (!_segments_outstanding.empty()) { TCPSegment& seg = _segments_outstanding.front(); const bool ac_segments = unwrap(seg.header().seqno, _isn, _recv_ackno) + seg.length_in_sequence_space() <= abs_seqno; if (ac_segments) { _byte_in_flight -= seg.length_in_sequence_space(); _segments_outstanding.pop(); } else break; }
fill_window(); if (!_segments_outstanding.empty()) { _time_running = true; _timer = 0; } return true; }
void TCPSender::tick(const size_t ms_since_last_tick) { _timer += ms_since_last_tick;
const bool timeout_resend = _timer >= _timeout && !_segments_outstanding.empty();
if (timeout_resend) { _segments_out.push(_segments_outstanding.front()); _consecutive_retransmissions++; _timeout *= 2; _time_running = true; _timer = 0; }
if (_segments_outstanding.empty()) { _time_running = false; } }
unsigned int TCPSender::consecutive_retransmissions() const { return _consecutive_retransmissions; }
void TCPSender::send_empty_segment() { TCPSegment res{}; res.header().seqno = wrap(_next_seqno, _isn); _segments_out.push(res); }
void TCPSender::send_segment(TCPSegment& seg) { seg.header().seqno = wrap(_next_seqno, _isn); _next_seqno += seg.length_in_sequence_space(); _byte_in_flight += seg.length_in_sequence_space(); _segments_out.push(seg); _segments_outstanding.push(seg); if (!_time_running) { _time_running = true; _timer = 0; } }
|