Skip to content

Commit a037d0b

Browse files
feat: Wire batching settings through publisher client factories (#42)
* feat: Wire batching settings through publisher client factories. * chore: fix formatting
1 parent f41c228 commit a037d0b

4 files changed

Lines changed: 47 additions & 17 deletions

File tree

google/cloud/pubsublite/cloudpubsub/make_publisher.py

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from google.api_core.client_options import ClientOptions
44
from google.auth.credentials import Credentials
5+
from google.cloud.pubsub_v1.types import BatchSettings
56

67
from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import (
78
AsyncPublisherImpl,
@@ -10,15 +11,19 @@
1011
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher
1112
from google.cloud.pubsublite.internal.wire.make_publisher import (
1213
make_publisher as make_wire_publisher,
14+
DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING,
1315
)
1416
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
1517
from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context
1618
from google.cloud.pubsublite.paths import TopicPath
1719

1820

21+
DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING
22+
23+
1924
def make_async_publisher(
2025
topic: TopicPath,
21-
batching_delay_secs: Optional[float] = None,
26+
per_partition_batching_settings: Optional[BatchSettings] = None,
2227
credentials: Optional[Credentials] = None,
2328
client_options: Optional[ClientOptions] = None,
2429
metadata: Optional[Mapping[str, str]] = None,
@@ -28,7 +33,7 @@ def make_async_publisher(
2833
2934
Args:
3035
topic: The topic to publish to.
31-
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
36+
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
3237
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
3338
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
3439
metadata: Additional metadata to send with the RPC.
@@ -43,15 +48,19 @@ def make_async_publisher(
4348

4449
def underlying_factory():
4550
return make_wire_publisher(
46-
topic, batching_delay_secs, credentials, client_options, metadata
51+
topic,
52+
per_partition_batching_settings,
53+
credentials,
54+
client_options,
55+
metadata,
4756
)
4857

4958
return AsyncPublisherImpl(underlying_factory)
5059

5160

5261
def make_publisher(
5362
topic: TopicPath,
54-
batching_delay_secs: Optional[float] = None,
63+
per_partition_batching_settings: Optional[BatchSettings] = None,
5564
credentials: Optional[Credentials] = None,
5665
client_options: Optional[ClientOptions] = None,
5766
metadata: Optional[Mapping[str, str]] = None,
@@ -61,7 +70,7 @@ def make_publisher(
6170
6271
Args:
6372
topic: The topic to publish to.
64-
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
73+
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
6574
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
6675
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
6776
metadata: Additional metadata to send with the RPC.
@@ -74,6 +83,10 @@ def make_publisher(
7483
"""
7584
return PublisherImpl(
7685
make_async_publisher(
77-
topic, batching_delay_secs, credentials, client_options, metadata
86+
topic,
87+
per_partition_batching_settings,
88+
credentials,
89+
client_options,
90+
metadata,
7891
)
7992
)

google/cloud/pubsublite/internal/wire/make_publisher.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from typing import AsyncIterator, Mapping, Optional, MutableMapping
22

3+
from google.cloud.pubsub_v1.types import BatchSettings
4+
35
from google.cloud.pubsublite.make_admin_client import make_admin_client
46
from google.cloud.pubsublite.endpoints import regional_endpoint
57
from google.cloud.pubsublite.internal.wire.default_routing_policy import (
@@ -23,9 +25,18 @@
2325
from google.auth.credentials import Credentials
2426

2527

28+
DEFAULT_BATCHING_SETTINGS = BatchSettings(
29+
max_bytes=(
30+
3 * 1024 * 1024
31+
), # 3 MiB to stay 1 MiB below GRPC's 4 MiB per-message limit.
32+
max_messages=1000,
33+
max_latency=0.05, # 50 ms
34+
)
35+
36+
2637
def make_publisher(
2738
topic: TopicPath,
28-
batching_delay_secs: Optional[float] = None,
39+
per_partition_batching_settings: Optional[BatchSettings] = None,
2940
credentials: Optional[Credentials] = None,
3041
client_options: Optional[ClientOptions] = None,
3142
metadata: Optional[Mapping[str, str]] = None,
@@ -35,7 +46,7 @@ def make_publisher(
3546
3647
Args:
3748
topic: The topic to publish to.
38-
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
49+
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
3950
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
4051
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
4152
metadata: Additional metadata to send with the RPC.
@@ -46,9 +57,8 @@ def make_publisher(
4657
Throws:
4758
GoogleApiCallException on any error determining topic structure.
4859
"""
49-
batching_delay_secs = (
50-
batching_delay_secs if batching_delay_secs is not None else 0.05
51-
)
60+
if per_partition_batching_settings is None:
61+
per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS
5262
admin_client = make_admin_client(
5363
region=topic.location.region,
5464
credentials=credentials,
@@ -76,7 +86,7 @@ def connection_factory(requests: AsyncIterator[PublishRequest]):
7686

7787
clients[partition] = SinglePartitionPublisher(
7888
InitialPublishRequest(topic=str(topic), partition=partition.value),
79-
batching_delay_secs,
89+
per_partition_batching_settings,
8090
GapicConnectionFactory(connection_factory),
8191
)
8292
return RoutingPublisher(DefaultRoutingPolicy(partition_count), clients)

google/cloud/pubsublite/internal/wire/single_partition_publisher.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from typing import Optional, List, Iterable
33

44
from absl import logging
5+
from google.cloud.pubsub_v1.types import BatchSettings
6+
57
from google.cloud.pubsublite.internal.wire.publisher import Publisher
68
from google.cloud.pubsublite.internal.wire.retrying_connection import (
79
RetryingConnection,
@@ -40,7 +42,7 @@ class SinglePartitionPublisher(
4042
BatchTester[PubSubMessage],
4143
):
4244
_initial: InitialPublishRequest
43-
_flush_seconds: float
45+
_batching_settings: BatchSettings
4446
_connection: RetryingConnection[PublishRequest, PublishResponse]
4547

4648
_batcher: SerialBatcher[PubSubMessage, Cursor]
@@ -52,11 +54,11 @@ class SinglePartitionPublisher(
5254
def __init__(
5355
self,
5456
initial: InitialPublishRequest,
55-
flush_seconds: float,
57+
batching_settings: BatchSettings,
5658
factory: ConnectionFactory[PublishRequest, PublishResponse],
5759
):
5860
self._initial = initial
59-
self._flush_seconds = flush_seconds
61+
self._batching_settings = batching_settings
6062
self._connection = RetryingConnection(factory, self)
6163
self._batcher = SerialBatcher(self)
6264
self._outstanding_writes = []
@@ -117,7 +119,7 @@ async def _receive_loop(self):
117119
async def _flush_loop(self):
118120
try:
119121
while True:
120-
await asyncio.sleep(self._flush_seconds)
122+
await asyncio.sleep(self._batching_settings.max_latency)
121123
await self._flush()
122124
except asyncio.CancelledError:
123125
return

tests/unit/pubsublite/internal/wire/single_partition_publisher_test.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
from asynctest.mock import MagicMock, CoroutineMock
77
import pytest
8+
from google.cloud.pubsub_v1.types import BatchSettings
9+
810
from google.cloud.pubsublite.internal.wire.connection import (
911
Connection,
1012
ConnectionFactory,
@@ -25,6 +27,9 @@
2527
from google.cloud.pubsublite.internal.wire.retrying_connection import _MIN_BACKOFF_SECS
2628

2729
FLUSH_SECONDS = 100000
30+
BATCHING_SETTINGS = BatchSettings(
31+
max_bytes=3 * 1024 * 1024, max_messages=1000, max_latency=FLUSH_SECONDS
32+
)
2833

2934
# All test coroutines will be treated as marked.
3035
pytestmark = pytest.mark.asyncio
@@ -81,7 +86,7 @@ async def sleeper(delay: float):
8186
@pytest.fixture()
8287
def publisher(connection_factory, initial_request):
8388
return SinglePartitionPublisher(
84-
initial_request.initial_request, FLUSH_SECONDS, connection_factory
89+
initial_request.initial_request, BATCHING_SETTINGS, connection_factory
8590
)
8691

8792

0 commit comments

Comments
 (0)