|
| 1 | +from typing import Dict |
| 2 | + |
| 3 | +from google.cloud.pubsublite.internal.wire.publisher import Publisher |
| 4 | +from google.cloud.pubsublite.internal.wire.routing_policy import RoutingPolicy |
| 5 | +from google.cloud.pubsublite.partition import Partition |
| 6 | +from google.cloud.pubsublite.publish_metadata import PublishMetadata |
| 7 | +from google.cloud.pubsublite_v1 import PubSubMessage |
| 8 | + |
| 9 | + |
| 10 | +class RoutingPublisher(Publisher): |
| 11 | + _routing_policy: RoutingPolicy |
| 12 | + _publishers: Dict[Partition, Publisher] |
| 13 | + |
| 14 | + def __init__(self, routing_policy: RoutingPolicy, publishers: Dict[Partition, Publisher]): |
| 15 | + self._routing_policy = routing_policy |
| 16 | + self._publishers = publishers |
| 17 | + |
| 18 | + async def __aenter__(self): |
| 19 | + for publisher in self._publishers.values(): |
| 20 | + await publisher.__aenter__() |
| 21 | + |
| 22 | + async def __aexit__(self, exc_type, exc_val, exc_tb): |
| 23 | + for publisher in self._publishers.values(): |
| 24 | + await publisher.__aexit__(exc_type, exc_val, exc_tb) |
| 25 | + |
| 26 | + async def publish(self, message: PubSubMessage) -> PublishMetadata: |
| 27 | + partition = self._routing_policy.route(message) |
| 28 | + assert partition in self._publishers |
| 29 | + return await self._publishers[partition].publish(message) |
0 commit comments