Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/properties.py: 95.70%
93 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import hashlib
15import json
16from collections import defaultdict
17from dataclasses import dataclass
18from itertools import chain, combinations
19from typing import Dict, Iterable, List, Optional, Protocol, Set, Tuple
21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Platform
22from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession
23from buildgrid.server.exceptions import FailedPreconditionError
24from buildgrid.server.logging import buildgrid_logger
26CAPABILITIES_WARNING_THRESHOLD = 10
27LOGGER = buildgrid_logger(__name__)
30def hash_from_dict(dictionary: Dict[str, List[str]]) -> str:
31 """Get the hash represntation of a dictionary"""
32 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest()
35class PropertySet(Protocol):
36 def execution_properties(self, platform: Platform) -> Tuple[str, Dict[str, List[str]]]:
37 """
38 Parses a platform value and returns the match properties used for scheduling.
39 Returns a label which can be used for applying metrics.
40 """
42 def worker_properties(self, bot_session: BotSession) -> List[Dict[str, List[str]]]:
43 """
44 Find all the valid property combinations which can be used to assign work to a bot.
45 """
48class DynamicPropertySet:
49 def __init__(
50 self,
51 *,
52 unique_property_keys: Set[str],
53 match_property_keys: Set[str],
54 wildcard_property_keys: Set[str],
55 label_key: Optional[str] = None,
56 ) -> None:
57 if unregistered_unique_keys := (unique_property_keys - match_property_keys) - wildcard_property_keys:
58 raise ValueError(f"Unique keys configured which are not match or wildcards: {unregistered_unique_keys}")
60 if label_key and label_key not in match_property_keys and label_key not in wildcard_property_keys:
61 raise ValueError(f"Label key is not registered as a match or wildcard key: {label_key}")
63 self.unique_property_keys = set(unique_property_keys)
64 self.match_property_keys = set(match_property_keys)
65 self.wildcard_property_keys = set(wildcard_property_keys)
66 self.all_property_keys = match_property_keys | wildcard_property_keys
67 self.label_key = label_key
69 def execution_properties(self, platform: Platform) -> Tuple[str, Dict[str, List[str]]]:
70 properties: Dict[str, Set[str]] = defaultdict(set)
71 for platform_property in platform.properties:
72 properties[platform_property.name].add(platform_property.value)
74 label = "unknown"
75 if self.label_key in properties:
76 label = sorted(properties[self.label_key])[0]
78 for name, values in properties.items():
79 if name not in self.all_property_keys:
80 raise FailedPreconditionError(
81 f"Unregistered platform property [{name}={values}]."
82 f" Known properties are: [{self.all_property_keys}]"
83 )
84 if name in self.unique_property_keys and len(values) > 1:
85 raise FailedPreconditionError(
86 f"Unique platform property [{name}] can only be set once. Got: [{values}]"
87 )
89 result = {k: sorted(v) for k, v in properties.items() if k in self.match_property_keys}
90 return label, result
92 def worker_properties(self, bot_session: BotSession) -> List[Dict[str, List[str]]]:
93 properties = bot_properties(bot_session)
94 properties = {k: v for k, v in properties.items() if k in self.match_property_keys}
95 return partial_bot_properties(properties)
98Properties = Set[Tuple[str, str]]
101@dataclass
102class PropertyLabel:
103 label: str
104 properties: Properties
107class StaticPropertySet:
108 def __init__(
109 self,
110 *,
111 property_labels: List[PropertyLabel],
112 wildcard_property_keys: Set[str],
113 ) -> None:
114 self.property_labels = property_labels
115 self.wildcard_property_keys = wildcard_property_keys
117 def execution_properties(self, platform: Platform) -> Tuple[str, Dict[str, List[str]]]:
118 execute_properties = {
119 (platform_property.name, platform_property.value)
120 for platform_property in platform.properties
121 if platform_property.name not in self.wildcard_property_keys
122 }
124 for property_label in self.property_labels:
125 if len(execute_properties - property_label.properties) == 0:
126 return property_label.label, merge_property_pairs(property_label.properties)
128 raise FailedPreconditionError(f"Could not find property set for {execute_properties}")
130 def worker_properties(self, bot_session: BotSession) -> List[Dict[str, List[str]]]:
131 bots_properties = bot_properties(bot_session)
132 property_pairs = {
133 (key, value)
134 for key, values in bots_properties.items()
135 for value in values
136 if key not in self.wildcard_property_keys
137 }
139 property_sets = []
140 for property_set in self.property_labels:
141 if len(property_set.properties - property_pairs) == 0:
142 property_sets.append(merge_property_pairs(property_set.properties))
144 if len(property_sets) == 0:
145 raise FailedPreconditionError(f"Could not find property set for {bots_properties}")
146 return [{k: sorted(v) for k, v in props.items()} for props in property_sets]
149def bot_properties(bot_session: BotSession) -> Dict[str, Set[str]]:
150 worker_capabilities: Dict[str, Set[str]] = {}
151 if bot_session.worker.devices:
152 # According to the spec:
153 # "The first device in the worker is the "primary device" -
154 # that is, the device running a bot and which is
155 # responsible for actually executing commands."
156 primary_device = bot_session.worker.devices[0]
158 for device_property in primary_device.properties:
159 if device_property.key not in worker_capabilities:
160 worker_capabilities[device_property.key] = set()
161 worker_capabilities[device_property.key].add(device_property.value)
162 return worker_capabilities
165def partial_bot_properties(properties: Dict[str, Set[str]]) -> List[Dict[str, List[str]]]:
166 property_pairs = flatten_properties(properties)
168 if len(property_pairs) > CAPABILITIES_WARNING_THRESHOLD:
169 LOGGER.warning(
170 "A worker with a large capabilities dictionary has been connected. "
171 f"Processing its capabilities may take a while. Capabilities: {property_pairs}"
172 )
174 # Using the itertools powerset recipe, construct the powerset of the tuples
175 powerset = chain.from_iterable(combinations(property_pairs, r) for r in range(len(property_pairs) + 1))
176 return list(map(merge_property_pairs, powerset))
179def flatten_properties(properties: Dict[str, Set[str]]) -> List[Tuple[str, str]]:
180 return [(name, value) for name in sorted(properties) for value in sorted(properties[name])]
183def merge_property_pairs(property_pairs: Iterable[Tuple[str, str]]) -> Dict[str, List[str]]:
184 properties: Dict[str, List[str]] = {}
185 for name, value in property_pairs:
186 properties.setdefault(name, []).append(value)
187 return {k: sorted(v) for k, v in properties.items()}