File tree Expand file tree Collapse file tree
tests/unit/pubsublite/internal/wire Expand file tree Collapse file tree Original file line number Diff line number Diff line change 1+ import hashlib
2+ import random
3+
4+ from google .cloud .pubsublite .internal .wire .routing_policy import RoutingPolicy
5+ from google .cloud .pubsublite .partition import Partition
6+ from google .cloud .pubsublite_v1 .types import PubSubMessage
7+
8+
9+ class DefaultRoutingPolicy (RoutingPolicy ):
10+ """
11+ The default routing policy which routes based on sha256 % num_partitions using the key if set or round robin if
12+ unset.
13+ """
14+ _num_partitions : int
15+ _current_round_robin : Partition
16+
17+ def __init__ (self , num_partitions : int ):
18+ self ._num_partitions = num_partitions
19+ self ._current_round_robin = Partition (random .randint (0 , num_partitions ))
20+
21+ def route (self , message : PubSubMessage ) -> Partition :
22+ """Route the message using the key if set or round robin if unset."""
23+ if not message .key :
24+ result = Partition (self ._current_round_robin .value )
25+ self ._current_round_robin .value = (self ._current_round_robin .value + 1 ) % self ._num_partitions
26+ return result
27+ sha = hashlib .sha256 ()
28+ sha .update (message .key )
29+ as_int = int .from_bytes (sha .digest (), byteorder = 'big' )
30+ return Partition (as_int % self ._num_partitions )
Original file line number Diff line number Diff line change 1+ from abc import ABC , abstractmethod
2+
3+ from google .cloud .pubsublite .partition import Partition
4+ from google .cloud .pubsublite_v1 .types .common import PubSubMessage
5+
6+
7+ class RoutingPolicy (ABC ):
8+ """A policy for how to route messages."""
9+ @abstractmethod
10+ def route (self , message : PubSubMessage ) -> Partition :
11+ """
12+ Route a message to a given partition.
13+ Args:
14+ message: The message to route
15+
16+ Returns: The partition to route to
17+
18+ """
19+ raise NotImplementedError ()
Original file line number Diff line number Diff line change 1+ from typing import NamedTuple
2+
3+
4+ class Partition (NamedTuple ):
5+ value : int
Original file line number Diff line number Diff line change 1+ import json
2+ import os
3+
4+ from google .cloud .pubsublite .partition import Partition
5+
6+ from google .cloud .pubsublite .internal .wire .default_routing_policy import DefaultRoutingPolicy
7+ from google .cloud .pubsublite_v1 import PubSubMessage
8+
9+
10+ def test_routing_cases ():
11+ policy = DefaultRoutingPolicy (num_partitions = 29 )
12+ json_list = []
13+ with open (os .path .join (os .path .dirname (__file__ ), "routing_tests.json" )) as f :
14+ for line in f :
15+ if not line .startswith ("//" ):
16+ json_list .append (line )
17+
18+ loaded = json .loads ("\n " .join (json_list ))
19+ target = {bytes (k , 'utf-8' ): Partition (v ) for k , v in loaded .items ()}
20+ result = {}
21+ for key in target :
22+ result [key ] = policy .route (PubSubMessage (key = key ))
23+ assert result == target
Original file line number Diff line number Diff line change 1+ // File defining a map from routing keys to the expected partition in a 29
2+ // partition topic for test purposes. This file should be copied into all client
3+ // library test suites.
4+ {
5+ "oaisdhfoiahsd" : 18 ,
6+ "P(#*YNPOIUDF" : 9 ,
7+ "LCIUNDFPOASIUN" :8 ,
8+ ";odsfiupoius" : 9 ,
9+ "OPISUDfpoiu" : 2 ,
10+ "dokjwO:IDf" : 21 ,
11+ "%^&*" : 19 ,
12+ "XXXXXXXXX" : 15 ,
13+ "dpcollins" : 28 ,
14+ "#()&$IJHLOIURF" : 2 ,
15+ "dfasiduyf" : 6 ,
16+ "983u2poer" : 3 ,
17+ "8888888" : 6 ,
18+ "OPUIPOUYPOIOPUIOIPUOUIPJOP" : 2 ,
19+ "x" : 16
20+ }
You can’t perform that action at this time.
0 commit comments