Skip to content

Commit 4276882

Browse files
fix: Assorted fixes to the publish layer and internals. (#39)
* fix: Assorted fixes to the publish layer and internals. * chore: Run blacken
1 parent 0d6f0f4 commit 4276882

5 files changed

Lines changed: 26 additions & 14 deletions

File tree

google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ async def publish(
2525
psl_message = from_cps_publish_message(cps_message)
2626
return (await self._publisher.publish(psl_message)).encode()
2727

28-
def __aenter__(self):
29-
self._publisher.__aenter__()
28+
async def __aenter__(self):
29+
await self._publisher.__aenter__()
3030
return self
3131

32-
def __aexit__(self, exc_type, exc_value, traceback):
33-
self._publisher.__aexit__(exc_type, exc_value, traceback)
32+
async def __aexit__(self, exc_type, exc_value, traceback):
33+
await self._publisher.__aexit__(exc_type, exc_value, traceback)

google/cloud/pubsublite/internal/wire/default_routing_policy.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,15 @@ class DefaultRoutingPolicy(RoutingPolicy):
1717

1818
def __init__(self, num_partitions: int):
1919
self._num_partitions = num_partitions
20-
self._current_round_robin = Partition(random.randint(0, num_partitions))
20+
self._current_round_robin = Partition(random.randint(0, num_partitions - 1))
2121

2222
def route(self, message: PubSubMessage) -> Partition:
2323
"""Route the message using the key if set or round robin if unset."""
2424
if not message.key:
2525
result = Partition(self._current_round_robin.value)
26-
self._current_round_robin.value = (
27-
self._current_round_robin.value + 1
28-
) % self._num_partitions
26+
self._current_round_robin = Partition(
27+
(self._current_round_robin.value + 1) % self._num_partitions
28+
)
2929
return result
3030
sha = hashlib.sha256()
3131
sha.update(message.key)

google/cloud/pubsublite/internal/wire/permanent_failable.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,17 @@
99
class PermanentFailable:
1010
"""A class that can experience permanent failures, with helpers for forwarding these to client actions."""
1111

12-
_failure_task: asyncio.Future
12+
_maybe_failure_task: Optional[asyncio.Future]
1313

1414
def __init__(self):
15-
self._failure_task = asyncio.Future()
15+
self._maybe_failure_task = None
16+
17+
@property
18+
def _failure_task(self) -> asyncio.Future:
19+
"""Get the failure task, initializing it lazily, since it needs to be initialized in the event loop."""
20+
if self._maybe_failure_task is None:
21+
self._maybe_failure_task = asyncio.Future()
22+
return self._maybe_failure_task
1623

1724
async def await_unless_failed(self, awaitable: Awaitable[T]) -> T:
1825
"""

google/cloud/pubsublite/location.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,9 @@
66
class CloudRegion(NamedTuple):
77
name: str
88

9+
def __str__(self):
10+
return self.name
11+
912

1013
class CloudZone(NamedTuple):
1114
region: CloudRegion

google/cloud/pubsublite/routing_metadata.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@
88

99

1010
def topic_routing_metadata(topic: TopicPath, partition: Partition) -> Mapping[str, str]:
11-
encoded = urlencode(topic)
12-
return {_PARAMS_HEADER: f"partition={partition.value}&topic={encoded}"}
11+
encoded = urlencode({"partition": str(partition.value), "topic": str(topic)})
12+
return {_PARAMS_HEADER: encoded}
1313

1414

1515
def subscription_routing_metadata(
1616
subscription: SubscriptionPath, partition: Partition
1717
) -> Mapping[str, str]:
18-
encoded = urlencode(subscription)
19-
return {_PARAMS_HEADER: f"partition={partition.value}&subscription={encoded}"}
18+
encoded = urlencode(
19+
{"partition": str(partition.value), "subscription": str(subscription)}
20+
)
21+
return {_PARAMS_HEADER: encoded}

0 commit comments

Comments
 (0)