Skip to content

Commit f72a2f0

Browse files
feat: Implement RoutingPolicy (#5)
* feat: Implement python retrying connection, which generically retries stream errors. * fix: Add asynctest to tests_require. * feat: Implement DefaultRouingPolicy * fix: Add class comments. * feat: Implement python retrying connection, which generically retries stream errors (#4) * feat: Implement python retrying connection, which generically retries stream errors. * fix: Add asynctest to tests_require. * fix: Add class comments. * feat: Implement DefaultRouingPolicy * fix: Add class comments. * docs: Add comment to DefaultRoutingPolicy.
1 parent 11c9a69 commit f72a2f0

5 files changed

Lines changed: 97 additions & 0 deletions

File tree

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import hashlib
2+
import random
3+
4+
from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy
5+
from google.cloud.pubsublite.partition import Partition
6+
from google.cloud.pubsublite_v1.types import PubSubMessage
7+
8+
9+
class DefaultRoutingPolicy(RoutingPolicy):
10+
"""
11+
The default routing policy which routes based on sha256 % num_partitions using the key if set or round robin if
12+
unset.
13+
"""
14+
_num_partitions: int
15+
_current_round_robin: Partition
16+
17+
def __init__(self, num_partitions: int):
18+
self._num_partitions = num_partitions
19+
self._current_round_robin = Partition(random.randint(0, num_partitions))
20+
21+
def route(self, message: PubSubMessage) -> Partition:
22+
"""Route the message using the key if set or round robin if unset."""
23+
if not message.key:
24+
result = Partition(self._current_round_robin.value)
25+
self._current_round_robin.value = (self._current_round_robin.value + 1) % self._num_partitions
26+
return result
27+
sha = hashlib.sha256()
28+
sha.update(message.key)
29+
as_int = int.from_bytes(sha.digest(), byteorder='big')
30+
return Partition(as_int % self._num_partitions)
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from abc import ABC, abstractmethod
2+
3+
from google.cloud.pubsublite.partition import Partition
4+
from google.cloud.pubsublite_v1.types.common import PubSubMessage
5+
6+
7+
class RoutingPolicy(ABC):
8+
"""A policy for how to route messages."""
9+
@abstractmethod
10+
def route(self, message: PubSubMessage) -> Partition:
11+
"""
12+
Route a message to a given partition.
13+
Args:
14+
message: The message to route
15+
16+
Returns: The partition to route to
17+
18+
"""
19+
raise NotImplementedError()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from typing import NamedTuple
2+
3+
4+
class Partition(NamedTuple):
5+
value: int
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import json
2+
import os
3+
4+
from google.cloud.pubsublite.partition import Partition
5+
6+
from google.cloud.pubsublite.internal.wire.default_routing_policy import DefaultRoutingPolicy
7+
from google.cloud.pubsublite_v1 import PubSubMessage
8+
9+
10+
def test_routing_cases():
11+
policy = DefaultRoutingPolicy(num_partitions=29)
12+
json_list = []
13+
with open(os.path.join(os.path.dirname(__file__), "routing_tests.json")) as f:
14+
for line in f:
15+
if not line.startswith("//"):
16+
json_list.append(line)
17+
18+
loaded = json.loads("\n".join(json_list))
19+
target = {bytes(k, 'utf-8'): Partition(v) for k, v in loaded.items()}
20+
result = {}
21+
for key in target:
22+
result[key] = policy.route(PubSubMessage(key=key))
23+
assert result == target
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// File defining a map from routing keys to the expected partition in a 29
2+
// partition topic for test purposes. This file should be copied into all client
3+
// library test suites.
4+
{
5+
"oaisdhfoiahsd": 18,
6+
"P(#*YNPOIUDF": 9,
7+
"LCIUNDFPOASIUN":8,
8+
";odsfiupoius": 9,
9+
"OPISUDfpoiu": 2,
10+
"dokjwO:IDf": 21,
11+
"%^&*": 19,
12+
"XXXXXXXXX": 15,
13+
"dpcollins": 28,
14+
"#()&$IJHLOIURF": 2,
15+
"dfasiduyf": 6,
16+
"983u2poer": 3,
17+
"8888888": 6,
18+
"OPUIPOUYPOIOPUIOIPUOUIPJOP": 2,
19+
"x": 16
20+
}

0 commit comments

Comments
 (0)