Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/properties.py: 95.10%

102 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-04-14 16:27 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14import hashlib 

15import json 

16from collections import defaultdict 

17from dataclasses import dataclass 

18from itertools import chain, combinations 

19from typing import Iterable, Protocol 

20 

21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Platform 

22from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession 

23from buildgrid.server.exceptions import FailedPreconditionError 

24from buildgrid.server.logging import buildgrid_logger 

25 

26CAPABILITIES_WARNING_THRESHOLD = 10 

27LOGGER = buildgrid_logger(__name__) 

28 

29 

30def hash_from_dict(dictionary: dict[str, list[str]]) -> str: 

31 """Get the hash represntation of a dictionary""" 

32 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

33 

34 

35class PropertySet(Protocol): 

36 def execution_properties(self, platform: Platform) -> tuple[str, dict[str, list[str]]]: 

37 """ 

38 Parses a platform value and returns the match properties used for scheduling. 

39 Returns a label which can be used for applying metrics. 

40 """ 

41 

42 def worker_properties(self, bot_session: BotSession) -> list[dict[str, list[str]]]: 

43 """ 

44 Find all the valid property combinations which can be used to assign work to a bot. 

45 """ 

46 

47 def bot_property_labels(self, bot_session: BotSession) -> list[str]: 

48 """ 

49 Find all label_key's which can be used to identify bot types in logging and metrics. 

50 """ 

51 

52 

53class DynamicPropertySet: 

54 def __init__( 

55 self, 

56 *, 

57 unique_property_keys: set[str], 

58 match_property_keys: set[str], 

59 wildcard_property_keys: set[str], 

60 label_key: str | None = None, 

61 ) -> None: 

62 if unregistered_unique_keys := (unique_property_keys - match_property_keys) - wildcard_property_keys: 

63 raise ValueError(f"Unique keys configured which are not match or wildcards: {unregistered_unique_keys}") 

64 

65 if label_key and label_key not in match_property_keys and label_key not in wildcard_property_keys: 

66 raise ValueError(f"Label key is not registered as a match or wildcard key: {label_key}") 

67 

68 self.unique_property_keys = set(unique_property_keys) 

69 self.match_property_keys = set(match_property_keys) 

70 self.wildcard_property_keys = set(wildcard_property_keys) 

71 self.all_property_keys = match_property_keys | wildcard_property_keys 

72 self.label_key = label_key 

73 

74 def execution_properties(self, platform: Platform) -> tuple[str, dict[str, list[str]]]: 

75 properties: dict[str, set[str]] = defaultdict(set) 

76 for platform_property in platform.properties: 

77 properties[platform_property.name].add(platform_property.value) 

78 

79 label = "unknown" 

80 if self.label_key in properties: 

81 label = sorted(properties[self.label_key])[0] 

82 

83 for name, values in properties.items(): 

84 if name not in self.all_property_keys: 

85 raise FailedPreconditionError( 

86 f"Unregistered platform property [{name}={values}]." 

87 f" Known properties are: [{self.all_property_keys}]" 

88 ) 

89 if name in self.unique_property_keys and len(values) > 1: 

90 raise FailedPreconditionError( 

91 f"Unique platform property [{name}] can only be set once. Got: [{values}]" 

92 ) 

93 

94 result = {k: sorted(v) for k, v in properties.items() if k in self.match_property_keys} 

95 return label, result 

96 

97 def worker_properties(self, bot_session: BotSession) -> list[dict[str, list[str]]]: 

98 properties = bot_properties(bot_session) 

99 properties = {k: v for k, v in properties.items() if k in self.match_property_keys} 

100 return partial_bot_properties(properties) 

101 

102 def bot_property_labels(self, bot_session: BotSession) -> list[str]: 

103 properties = bot_properties(bot_session) 

104 if self.label_key in properties: 

105 return sorted(properties[self.label_key]) 

106 return [] 

107 

108 

109Properties = set[tuple[str, str]] 

110 

111 

112@dataclass 

113class PropertyLabel: 

114 label: str 

115 properties: Properties 

116 

117 

118class StaticPropertySet: 

119 def __init__( 

120 self, 

121 *, 

122 property_labels: list[PropertyLabel], 

123 wildcard_property_keys: set[str], 

124 ) -> None: 

125 self.property_labels = property_labels 

126 self.wildcard_property_keys = wildcard_property_keys 

127 

128 def execution_properties(self, platform: Platform) -> tuple[str, dict[str, list[str]]]: 

129 execute_properties = { 

130 (platform_property.name, platform_property.value) 

131 for platform_property in platform.properties 

132 if platform_property.name not in self.wildcard_property_keys 

133 } 

134 

135 for property_label in self.property_labels: 

136 if len(execute_properties - property_label.properties) == 0: 

137 return property_label.label, merge_property_pairs(property_label.properties) 

138 

139 raise FailedPreconditionError(f"Could not find property set for {execute_properties}") 

140 

141 def worker_properties(self, bot_session: BotSession) -> list[dict[str, list[str]]]: 

142 bots_properties = bot_properties(bot_session) 

143 property_pairs = { 

144 (key, value) 

145 for key, values in bots_properties.items() 

146 for value in values 

147 if key not in self.wildcard_property_keys 

148 } 

149 

150 property_sets = [] 

151 for property_set in self.property_labels: 

152 if len(property_set.properties - property_pairs) == 0: 

153 property_sets.append(merge_property_pairs(property_set.properties)) 

154 

155 if len(property_sets) == 0: 

156 raise FailedPreconditionError(f"Could not find property set for {bots_properties}") 

157 return [{k: sorted(v) for k, v in props.items()} for props in property_sets] 

158 

159 def bot_property_labels(self, bot_session: BotSession) -> list[str]: 

160 # To gain a label the bot must have all the properties for that label. 

161 bot_props = bot_properties(bot_session) 

162 return [ 

163 property_label.label 

164 for property_label in self.property_labels 

165 if all(key in bot_props and value in bot_props[key] for [key, value] in property_label.properties) 

166 ] 

167 

168 

169def bot_properties(bot_session: BotSession) -> dict[str, set[str]]: 

170 worker_capabilities: dict[str, set[str]] = {} 

171 if bot_session.worker.devices: 

172 # According to the spec: 

173 # "The first device in the worker is the "primary device" - 

174 # that is, the device running a bot and which is 

175 # responsible for actually executing commands." 

176 primary_device = bot_session.worker.devices[0] 

177 

178 for device_property in primary_device.properties: 

179 if device_property.key not in worker_capabilities: 

180 worker_capabilities[device_property.key] = set() 

181 worker_capabilities[device_property.key].add(device_property.value) 

182 return worker_capabilities 

183 

184 

185def partial_bot_properties(properties: dict[str, set[str]]) -> list[dict[str, list[str]]]: 

186 property_pairs = flatten_properties(properties) 

187 

188 if len(property_pairs) > CAPABILITIES_WARNING_THRESHOLD: 

189 LOGGER.warning( 

190 "A worker with a large capabilities dictionary has been connected. " 

191 f"Processing its capabilities may take a while. Capabilities: {property_pairs}" 

192 ) 

193 

194 # Using the itertools powerset recipe, construct the powerset of the tuples 

195 powerset = chain.from_iterable(combinations(property_pairs, r) for r in range(len(property_pairs) + 1)) 

196 return list(map(merge_property_pairs, powerset)) 

197 

198 

199def flatten_properties(properties: dict[str, set[str]]) -> list[tuple[str, str]]: 

200 return [(name, value) for name in sorted(properties) for value in sorted(properties[name])] 

201 

202 

203def merge_property_pairs(property_pairs: Iterable[tuple[str, str]]) -> dict[str, list[str]]: 

204 properties: dict[str, list[str]] = {} 

205 for name, value in property_pairs: 

206 properties.setdefault(name, []).append(value) 

207 return {k: sorted(v) for k, v in properties.items()}