Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/properties.py: 95.10%
102 statements
« prev ^ index » next coverage.py v7.4.1, created at 2025-04-14 16:27 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2025-04-14 16:27 +0000
1# Copyright (C) 2024 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import hashlib
15import json
16from collections import defaultdict
17from dataclasses import dataclass
18from itertools import chain, combinations
19from typing import Iterable, Protocol
21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Platform
22from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession
23from buildgrid.server.exceptions import FailedPreconditionError
24from buildgrid.server.logging import buildgrid_logger
26CAPABILITIES_WARNING_THRESHOLD = 10
27LOGGER = buildgrid_logger(__name__)
30def hash_from_dict(dictionary: dict[str, list[str]]) -> str:
31 """Get the hash represntation of a dictionary"""
32 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest()
35class PropertySet(Protocol):
36 def execution_properties(self, platform: Platform) -> tuple[str, dict[str, list[str]]]:
37 """
38 Parses a platform value and returns the match properties used for scheduling.
39 Returns a label which can be used for applying metrics.
40 """
42 def worker_properties(self, bot_session: BotSession) -> list[dict[str, list[str]]]:
43 """
44 Find all the valid property combinations which can be used to assign work to a bot.
45 """
47 def bot_property_labels(self, bot_session: BotSession) -> list[str]:
48 """
49 Find all label_key's which can be used to identify bot types in logging and metrics.
50 """
53class DynamicPropertySet:
54 def __init__(
55 self,
56 *,
57 unique_property_keys: set[str],
58 match_property_keys: set[str],
59 wildcard_property_keys: set[str],
60 label_key: str | None = None,
61 ) -> None:
62 if unregistered_unique_keys := (unique_property_keys - match_property_keys) - wildcard_property_keys:
63 raise ValueError(f"Unique keys configured which are not match or wildcards: {unregistered_unique_keys}")
65 if label_key and label_key not in match_property_keys and label_key not in wildcard_property_keys:
66 raise ValueError(f"Label key is not registered as a match or wildcard key: {label_key}")
68 self.unique_property_keys = set(unique_property_keys)
69 self.match_property_keys = set(match_property_keys)
70 self.wildcard_property_keys = set(wildcard_property_keys)
71 self.all_property_keys = match_property_keys | wildcard_property_keys
72 self.label_key = label_key
74 def execution_properties(self, platform: Platform) -> tuple[str, dict[str, list[str]]]:
75 properties: dict[str, set[str]] = defaultdict(set)
76 for platform_property in platform.properties:
77 properties[platform_property.name].add(platform_property.value)
79 label = "unknown"
80 if self.label_key in properties:
81 label = sorted(properties[self.label_key])[0]
83 for name, values in properties.items():
84 if name not in self.all_property_keys:
85 raise FailedPreconditionError(
86 f"Unregistered platform property [{name}={values}]."
87 f" Known properties are: [{self.all_property_keys}]"
88 )
89 if name in self.unique_property_keys and len(values) > 1:
90 raise FailedPreconditionError(
91 f"Unique platform property [{name}] can only be set once. Got: [{values}]"
92 )
94 result = {k: sorted(v) for k, v in properties.items() if k in self.match_property_keys}
95 return label, result
97 def worker_properties(self, bot_session: BotSession) -> list[dict[str, list[str]]]:
98 properties = bot_properties(bot_session)
99 properties = {k: v for k, v in properties.items() if k in self.match_property_keys}
100 return partial_bot_properties(properties)
102 def bot_property_labels(self, bot_session: BotSession) -> list[str]:
103 properties = bot_properties(bot_session)
104 if self.label_key in properties:
105 return sorted(properties[self.label_key])
106 return []
109Properties = set[tuple[str, str]]
112@dataclass
113class PropertyLabel:
114 label: str
115 properties: Properties
118class StaticPropertySet:
119 def __init__(
120 self,
121 *,
122 property_labels: list[PropertyLabel],
123 wildcard_property_keys: set[str],
124 ) -> None:
125 self.property_labels = property_labels
126 self.wildcard_property_keys = wildcard_property_keys
128 def execution_properties(self, platform: Platform) -> tuple[str, dict[str, list[str]]]:
129 execute_properties = {
130 (platform_property.name, platform_property.value)
131 for platform_property in platform.properties
132 if platform_property.name not in self.wildcard_property_keys
133 }
135 for property_label in self.property_labels:
136 if len(execute_properties - property_label.properties) == 0:
137 return property_label.label, merge_property_pairs(property_label.properties)
139 raise FailedPreconditionError(f"Could not find property set for {execute_properties}")
141 def worker_properties(self, bot_session: BotSession) -> list[dict[str, list[str]]]:
142 bots_properties = bot_properties(bot_session)
143 property_pairs = {
144 (key, value)
145 for key, values in bots_properties.items()
146 for value in values
147 if key not in self.wildcard_property_keys
148 }
150 property_sets = []
151 for property_set in self.property_labels:
152 if len(property_set.properties - property_pairs) == 0:
153 property_sets.append(merge_property_pairs(property_set.properties))
155 if len(property_sets) == 0:
156 raise FailedPreconditionError(f"Could not find property set for {bots_properties}")
157 return [{k: sorted(v) for k, v in props.items()} for props in property_sets]
159 def bot_property_labels(self, bot_session: BotSession) -> list[str]:
160 # To gain a label the bot must have all the properties for that label.
161 bot_props = bot_properties(bot_session)
162 return [
163 property_label.label
164 for property_label in self.property_labels
165 if all(key in bot_props and value in bot_props[key] for [key, value] in property_label.properties)
166 ]
169def bot_properties(bot_session: BotSession) -> dict[str, set[str]]:
170 worker_capabilities: dict[str, set[str]] = {}
171 if bot_session.worker.devices:
172 # According to the spec:
173 # "The first device in the worker is the "primary device" -
174 # that is, the device running a bot and which is
175 # responsible for actually executing commands."
176 primary_device = bot_session.worker.devices[0]
178 for device_property in primary_device.properties:
179 if device_property.key not in worker_capabilities:
180 worker_capabilities[device_property.key] = set()
181 worker_capabilities[device_property.key].add(device_property.value)
182 return worker_capabilities
185def partial_bot_properties(properties: dict[str, set[str]]) -> list[dict[str, list[str]]]:
186 property_pairs = flatten_properties(properties)
188 if len(property_pairs) > CAPABILITIES_WARNING_THRESHOLD:
189 LOGGER.warning(
190 "A worker with a large capabilities dictionary has been connected. "
191 f"Processing its capabilities may take a while. Capabilities: {property_pairs}"
192 )
194 # Using the itertools powerset recipe, construct the powerset of the tuples
195 powerset = chain.from_iterable(combinations(property_pairs, r) for r in range(len(property_pairs) + 1))
196 return list(map(merge_property_pairs, powerset))
199def flatten_properties(properties: dict[str, set[str]]) -> list[tuple[str, str]]:
200 return [(name, value) for name in sorted(properties) for value in sorted(properties[name])]
203def merge_property_pairs(property_pairs: Iterable[tuple[str, str]]) -> dict[str, list[str]]:
204 properties: dict[str, list[str]] = {}
205 for name, value in property_pairs:
206 properties.setdefault(name, []).append(value)
207 return {k: sorted(v) for k, v in properties.items()}