Skip to content

Commit b93a0bf

Browse files
authored
fix: Backlog never zero despite messages received (#204)
1 parent a0822f4 commit b93a0bf

2 files changed

Lines changed: 7 additions & 5 deletions

File tree

google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker_impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ async def ack(self, offset: int):
5454
if receipt == ack:
5555
prefix_acked_offset = receipt
5656
continue
57-
self._receipts.append(receipt)
57+
self._receipts.appendleft(receipt)
5858
self._acks.put(ack)
5959
break
6060
if prefix_acked_offset is None:

tests/unit/pubsublite/cloudpubsub/internal/ack_set_tracker_impl_test.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,17 @@ async def test_track_and_aggregate_acks(committer, tracker: AckSetTracker):
5151
committer.commit.assert_has_calls([])
5252
await tracker.ack(offset=3)
5353
committer.commit.assert_has_calls([])
54-
await tracker.ack(offset=5)
55-
committer.commit.assert_has_calls([])
5654
await tracker.ack(offset=1)
57-
committer.commit.assert_has_calls([call(Cursor(offset=6))])
55+
committer.commit.assert_has_calls([call(Cursor(offset=4))])
56+
await tracker.ack(offset=5)
57+
committer.commit.assert_has_calls(
58+
[call(Cursor(offset=4)), call(Cursor(offset=6))]
59+
)
5860

5961
tracker.track(offset=8)
6062
await tracker.ack(offset=7)
6163
committer.commit.assert_has_calls(
62-
[call(Cursor(offset=6)), call(Cursor(offset=8))]
64+
[call(Cursor(offset=4)), call(Cursor(offset=6)), call(Cursor(offset=8))]
6365
)
6466
committer.__aexit__.assert_called_once()
6567

0 commit comments

Comments
 (0)