diff --git a/chunk_streamer.go b/chunk_streamer.go index d11a9fb..04c07c2 100644 --- a/chunk_streamer.go +++ b/chunk_streamer.go @@ -116,10 +116,11 @@ again: if err != nil { return nil, err } - if cs.r.totalReadBytes > uint64(cs.peerState.ackWindowSize/2) { // TODO: fix size - if err := cs.sendAck(); err != nil { + if cs.r.FragmentReadBytes() >= uint32(cs.peerState.ackWindowSize/2) { // TODO: fix size + if err := cs.sendAck(cs.r.TotalReadBytes()); err != nil { return nil, err } + cs.r.ResetFragmentReadBytes() } if !isCompleted { @@ -346,11 +347,11 @@ func (cs *ChunkStreamer) prepareChunkWriter(chunkStreamID int) (*ChunkStreamWrit return writer, nil } -func (cs *ChunkStreamer) sendAck() error { - cs.logger.Infof("Sending Ack...") +func (cs *ChunkStreamer) sendAck(readBytes uint32) error { + cs.logger.Infof("Sending Ack...: Bytes = %d", readBytes) // TODO: chunk stream id and fix timestamp return cs.controlStreamWriter(2, 0, &message.Ack{ - SequenceNumber: uint32(cs.r.totalReadBytes), + SequenceNumber: readBytes, }) } diff --git a/chunk_streamer_reader.go b/chunk_streamer_reader.go index ec9830a..b1d4813 100644 --- a/chunk_streamer_reader.go +++ b/chunk_streamer_reader.go @@ -12,12 +12,26 @@ import ( ) type ChunkStreamerReader struct { - reader io.Reader - totalReadBytes uint64 + reader io.Reader + totalReadBytes uint32 // TODO: Check overflow + fragmentReadBytes uint32 } func (r *ChunkStreamerReader) Read(b []byte) (int, error) { n, err := r.reader.Read(b) - r.totalReadBytes += uint64(n) + r.totalReadBytes += uint32(n) + r.fragmentReadBytes += uint32(n) return n, err } + +func (r *ChunkStreamerReader) TotalReadBytes() uint32 { + return r.totalReadBytes +} + +func (r *ChunkStreamerReader) FragmentReadBytes() uint32 { + return r.fragmentReadBytes +} + +func (r *ChunkStreamerReader) ResetFragmentReadBytes() { + r.fragmentReadBytes = 0 +}