Skip to content

Commit 3236752

Browse files
authored
feat: Publish idempotence (#408)
Implements publish idempotence (default enabled), where the server will ensure that unique messages within a single publisher session are only stored once.
1 parent 91a8094 commit 3236752

8 files changed

Lines changed: 323 additions & 45 deletions

File tree

google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
AsyncSinglePublisher,
2929
SinglePublisher,
3030
)
31+
from google.cloud.pubsublite.internal.publisher_client_id import PublisherClientId
3132
from google.cloud.pubsublite.internal.wire.make_publisher import (
3233
make_publisher as make_wire_publisher,
3334
DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING,
@@ -47,6 +48,7 @@ def make_async_publisher(
4748
credentials: Optional[Credentials] = None,
4849
client_options: Optional[ClientOptions] = None,
4950
metadata: Optional[Mapping[str, str]] = None,
51+
client_id: Optional[PublisherClientId] = None,
5052
) -> AsyncSinglePublisher:
5153
"""
5254
Make a new publisher for the given topic.
@@ -58,6 +60,7 @@ def make_async_publisher(
5860
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
5961
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
6062
metadata: Additional metadata to send with the RPC.
63+
client_id: 128-bit unique client id. If set, enables publish idempotency for the session.
6164
6265
Returns:
6366
A new AsyncPublisher.
@@ -75,6 +78,7 @@ def underlying_factory():
7578
credentials=credentials,
7679
client_options=client_options,
7780
metadata=metadata,
81+
client_id=client_id,
7882
)
7983

8084
return AsyncSinglePublisherImpl(underlying_factory)
@@ -87,6 +91,7 @@ def make_publisher(
8791
credentials: Optional[Credentials] = None,
8892
client_options: Optional[ClientOptions] = None,
8993
metadata: Optional[Mapping[str, str]] = None,
94+
client_id: Optional[PublisherClientId] = None,
9095
) -> SinglePublisher:
9196
"""
9297
Make a new publisher for the given topic.
@@ -98,6 +103,7 @@ def make_publisher(
98103
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
99104
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
100105
metadata: Additional metadata to send with the RPC.
106+
client_id: 128-bit unique client id. If set, enables publish idempotency for the session.
101107
102108
Returns:
103109
A new Publisher.
@@ -113,5 +119,6 @@ def make_publisher(
113119
credentials=credentials,
114120
client_options=client_options,
115121
metadata=metadata,
122+
client_id=client_id,
116123
)
117124
)

google/cloud/pubsublite/cloudpubsub/publisher_client.py

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
from concurrent.futures import Future
1616
from typing import Optional, Mapping, Union
17+
from uuid import uuid4
1718

1819
from google.api_core.client_options import ClientOptions
1920
from google.auth.credentials import Credentials
@@ -36,6 +37,7 @@
3637
from google.cloud.pubsublite.internal.constructable_from_service_account import (
3738
ConstructableFromServiceAccount,
3839
)
40+
from google.cloud.pubsublite.internal.publisher_client_id import PublisherClientId
3941
from google.cloud.pubsublite.internal.require_started import RequireStarted
4042
from google.cloud.pubsublite.internal.wire.make_publisher import (
4143
DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING,
@@ -44,6 +46,10 @@
4446
from overrides import overrides
4547

4648

49+
def _get_client_id(enable_idempotence: bool):
50+
return PublisherClientId(uuid4().bytes) if enable_idempotence else None
51+
52+
4753
class PublisherClient(PublisherClientInterface, ConstructableFromServiceAccount):
4854
"""
4955
A PublisherClient publishes messages similar to Google Pub/Sub.
@@ -53,7 +59,7 @@ class PublisherClient(PublisherClientInterface, ConstructableFromServiceAccount)
5359
"""
5460

5561
_impl: PublisherClientInterface
56-
_require_stared: RequireStarted
62+
_require_started: RequireStarted
5763

5864
DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING
5965
"""
@@ -67,6 +73,7 @@ def __init__(
6773
credentials: Optional[Credentials] = None,
6874
transport: str = "grpc_asyncio",
6975
client_options: Optional[ClientOptions] = None,
76+
enable_idempotence: bool = True,
7077
):
7178
"""
7279
Create a new PublisherClient.
@@ -76,17 +83,20 @@ def __init__(
7683
credentials: If provided, the credentials to use when connecting.
7784
transport: The transport to use. Must correspond to an asyncio transport.
7885
client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`.
86+
enable_idempotence: Whether idempotence is enabled, where the server will ensure that unique messages within a single publisher session are stored only once.
7987
"""
88+
client_id = _get_client_id(enable_idempotence)
8089
self._impl = MultiplexedPublisherClient(
8190
lambda topic: make_publisher(
8291
topic=topic,
8392
per_partition_batching_settings=per_partition_batching_settings,
8493
credentials=credentials,
8594
client_options=client_options,
8695
transport=transport,
96+
client_id=client_id,
8797
)
8898
)
89-
self._require_stared = RequireStarted()
99+
self._require_started = RequireStarted()
90100

91101
@overrides
92102
def publish(
@@ -96,21 +106,21 @@ def publish(
96106
ordering_key: str = "",
97107
**attrs: Mapping[str, str],
98108
) -> "Future[str]":
99-
self._require_stared.require_started()
109+
self._require_started.require_started()
100110
return self._impl.publish(
101111
topic=topic, data=data, ordering_key=ordering_key, **attrs
102112
)
103113

104114
@overrides
105115
def __enter__(self):
106-
self._require_stared.__enter__()
116+
self._require_started.__enter__()
107117
self._impl.__enter__()
108118
return self
109119

110120
@overrides
111121
def __exit__(self, exc_type, exc_value, traceback):
112122
self._impl.__exit__(exc_type, exc_value, traceback)
113-
self._require_stared.__exit__(exc_type, exc_value, traceback)
123+
self._require_started.__exit__(exc_type, exc_value, traceback)
114124

115125

116126
class AsyncPublisherClient(
@@ -124,7 +134,7 @@ class AsyncPublisherClient(
124134
"""
125135

126136
_impl: AsyncPublisherClientInterface
127-
_require_stared: RequireStarted
137+
_require_started: RequireStarted
128138

129139
DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING
130140
"""
@@ -138,6 +148,7 @@ def __init__(
138148
credentials: Optional[Credentials] = None,
139149
transport: str = "grpc_asyncio",
140150
client_options: Optional[ClientOptions] = None,
151+
enable_idempotence: bool = True,
141152
):
142153
"""
143154
Create a new AsyncPublisherClient.
@@ -147,17 +158,20 @@ def __init__(
147158
credentials: If provided, the credentials to use when connecting.
148159
transport: The transport to use. Must correspond to an asyncio transport.
149160
client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`.
161+
enable_idempotence: Whether idempotence is enabled, where the server will ensure that unique messages within a single publisher session are stored only once.
150162
"""
163+
client_id = _get_client_id(enable_idempotence)
151164
self._impl = MultiplexedAsyncPublisherClient(
152165
lambda topic: make_async_publisher(
153166
topic=topic,
154167
per_partition_batching_settings=per_partition_batching_settings,
155168
credentials=credentials,
156169
client_options=client_options,
157170
transport=transport,
171+
client_id=client_id,
158172
)
159173
)
160-
self._require_stared = RequireStarted()
174+
self._require_started = RequireStarted()
161175

162176
@overrides
163177
async def publish(
@@ -167,18 +181,18 @@ async def publish(
167181
ordering_key: str = "",
168182
**attrs: Mapping[str, str],
169183
) -> str:
170-
self._require_stared.require_started()
184+
self._require_started.require_started()
171185
return await self._impl.publish(
172186
topic=topic, data=data, ordering_key=ordering_key, **attrs
173187
)
174188

175189
@overrides
176190
async def __aenter__(self):
177-
self._require_stared.__enter__()
191+
self._require_started.__enter__()
178192
await self._impl.__aenter__()
179193
return self
180194

181195
@overrides
182196
async def __aexit__(self, exc_type, exc_value, traceback):
183197
await self._impl.__aexit__(exc_type, exc_value, traceback)
184-
self._require_stared.__exit__(exc_type, exc_value, traceback)
198+
self._require_started.__exit__(exc_type, exc_value, traceback)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import NamedTuple
16+
17+
18+
class PublishSequenceNumber(NamedTuple):
19+
value: int
20+
21+
def next(self) -> "PublishSequenceNumber":
22+
return PublishSequenceNumber(self.value + 1)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# Copyright 2023 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import NamedTuple
16+
17+
18+
class PublisherClientId(NamedTuple):
19+
value: bytes

google/cloud/pubsublite/internal/wire/make_publisher.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919
from google.cloud.pubsublite.admin_client import AdminClient
2020
from google.cloud.pubsublite.internal.endpoints import regional_endpoint
21+
from google.cloud.pubsublite.internal.publisher_client_id import PublisherClientId
22+
from google.cloud.pubsublite.internal.publish_sequence_number import (
23+
PublishSequenceNumber,
24+
)
2125
from google.cloud.pubsublite.internal.wire.client_cache import ClientCache
2226
from google.cloud.pubsublite.internal.wire.default_routing_policy import (
2327
DefaultRoutingPolicy,
@@ -60,6 +64,7 @@ def make_publisher(
6064
credentials: Optional[Credentials] = None,
6165
client_options: Optional[ClientOptions] = None,
6266
metadata: Optional[Mapping[str, str]] = None,
67+
client_id: Optional[PublisherClientId] = None,
6368
) -> Publisher:
6469
"""
6570
Make a new publisher for the given topic.
@@ -71,6 +76,7 @@ def make_publisher(
7176
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
7277
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
7378
metadata: Additional metadata to send with the RPC.
79+
client_id: 128-bit unique client id. If set, enables publish idempotency for the session.
7480
7581
Returns:
7682
A new Publisher.
@@ -104,10 +110,16 @@ def connection_factory(requests: AsyncIterator[PublishRequest]):
104110
requests, metadata=list(final_metadata.items())
105111
)
106112

113+
initial_request = InitialPublishRequest(
114+
topic=str(topic), partition=partition.value
115+
)
116+
if client_id:
117+
initial_request.client_id = client_id.value
107118
return SinglePartitionPublisher(
108-
InitialPublishRequest(topic=str(topic), partition=partition.value),
119+
initial_request,
109120
per_partition_batching_settings,
110121
GapicConnectionFactory(connection_factory),
122+
PublishSequenceNumber(0),
111123
)
112124

113125
def policy_factory(partition_count: int):

0 commit comments

Comments
 (0)