Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/scheduler/properties.py: 95.70%

93 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2024 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14import hashlib 

15import json 

16from collections import defaultdict 

17from dataclasses import dataclass 

18from itertools import chain, combinations 

19from typing import Dict, Iterable, List, Optional, Protocol, Set, Tuple 

20 

21from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Platform 

22from buildgrid._protos.google.devtools.remoteworkers.v1test2.bots_pb2 import BotSession 

23from buildgrid.server.exceptions import FailedPreconditionError 

24from buildgrid.server.logging import buildgrid_logger 

25 

26CAPABILITIES_WARNING_THRESHOLD = 10 

27LOGGER = buildgrid_logger(__name__) 

28 

29 

30def hash_from_dict(dictionary: Dict[str, List[str]]) -> str: 

31 """Get the hash represntation of a dictionary""" 

32 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

33 

34 

35class PropertySet(Protocol): 

36 def execution_properties(self, platform: Platform) -> Tuple[str, Dict[str, List[str]]]: 

37 """ 

38 Parses a platform value and returns the match properties used for scheduling. 

39 Returns a label which can be used for applying metrics. 

40 """ 

41 

42 def worker_properties(self, bot_session: BotSession) -> List[Dict[str, List[str]]]: 

43 """ 

44 Find all the valid property combinations which can be used to assign work to a bot. 

45 """ 

46 

47 

48class DynamicPropertySet: 

49 def __init__( 

50 self, 

51 *, 

52 unique_property_keys: Set[str], 

53 match_property_keys: Set[str], 

54 wildcard_property_keys: Set[str], 

55 label_key: Optional[str] = None, 

56 ) -> None: 

57 if unregistered_unique_keys := (unique_property_keys - match_property_keys) - wildcard_property_keys: 

58 raise ValueError(f"Unique keys configured which are not match or wildcards: {unregistered_unique_keys}") 

59 

60 if label_key and label_key not in match_property_keys and label_key not in wildcard_property_keys: 

61 raise ValueError(f"Label key is not registered as a match or wildcard key: {label_key}") 

62 

63 self.unique_property_keys = set(unique_property_keys) 

64 self.match_property_keys = set(match_property_keys) 

65 self.wildcard_property_keys = set(wildcard_property_keys) 

66 self.all_property_keys = match_property_keys | wildcard_property_keys 

67 self.label_key = label_key 

68 

69 def execution_properties(self, platform: Platform) -> Tuple[str, Dict[str, List[str]]]: 

70 properties: Dict[str, Set[str]] = defaultdict(set) 

71 for platform_property in platform.properties: 

72 properties[platform_property.name].add(platform_property.value) 

73 

74 label = "unknown" 

75 if self.label_key in properties: 

76 label = sorted(properties[self.label_key])[0] 

77 

78 for name, values in properties.items(): 

79 if name not in self.all_property_keys: 

80 raise FailedPreconditionError( 

81 f"Unregistered platform property [{name}={values}]." 

82 f" Known properties are: [{self.all_property_keys}]" 

83 ) 

84 if name in self.unique_property_keys and len(values) > 1: 

85 raise FailedPreconditionError( 

86 f"Unique platform property [{name}] can only be set once. Got: [{values}]" 

87 ) 

88 

89 result = {k: sorted(v) for k, v in properties.items() if k in self.match_property_keys} 

90 return label, result 

91 

92 def worker_properties(self, bot_session: BotSession) -> List[Dict[str, List[str]]]: 

93 properties = bot_properties(bot_session) 

94 properties = {k: v for k, v in properties.items() if k in self.match_property_keys} 

95 return partial_bot_properties(properties) 

96 

97 

98Properties = Set[Tuple[str, str]] 

99 

100 

101@dataclass 

102class PropertyLabel: 

103 label: str 

104 properties: Properties 

105 

106 

107class StaticPropertySet: 

108 def __init__( 

109 self, 

110 *, 

111 property_labels: List[PropertyLabel], 

112 wildcard_property_keys: Set[str], 

113 ) -> None: 

114 self.property_labels = property_labels 

115 self.wildcard_property_keys = wildcard_property_keys 

116 

117 def execution_properties(self, platform: Platform) -> Tuple[str, Dict[str, List[str]]]: 

118 execute_properties = { 

119 (platform_property.name, platform_property.value) 

120 for platform_property in platform.properties 

121 if platform_property.name not in self.wildcard_property_keys 

122 } 

123 

124 for property_label in self.property_labels: 

125 if len(execute_properties - property_label.properties) == 0: 

126 return property_label.label, merge_property_pairs(property_label.properties) 

127 

128 raise FailedPreconditionError(f"Could not find property set for {execute_properties}") 

129 

130 def worker_properties(self, bot_session: BotSession) -> List[Dict[str, List[str]]]: 

131 bots_properties = bot_properties(bot_session) 

132 property_pairs = { 

133 (key, value) 

134 for key, values in bots_properties.items() 

135 for value in values 

136 if key not in self.wildcard_property_keys 

137 } 

138 

139 property_sets = [] 

140 for property_set in self.property_labels: 

141 if len(property_set.properties - property_pairs) == 0: 

142 property_sets.append(merge_property_pairs(property_set.properties)) 

143 

144 if len(property_sets) == 0: 

145 raise FailedPreconditionError(f"Could not find property set for {bots_properties}") 

146 return [{k: sorted(v) for k, v in props.items()} for props in property_sets] 

147 

148 

149def bot_properties(bot_session: BotSession) -> Dict[str, Set[str]]: 

150 worker_capabilities: Dict[str, Set[str]] = {} 

151 if bot_session.worker.devices: 

152 # According to the spec: 

153 # "The first device in the worker is the "primary device" - 

154 # that is, the device running a bot and which is 

155 # responsible for actually executing commands." 

156 primary_device = bot_session.worker.devices[0] 

157 

158 for device_property in primary_device.properties: 

159 if device_property.key not in worker_capabilities: 

160 worker_capabilities[device_property.key] = set() 

161 worker_capabilities[device_property.key].add(device_property.value) 

162 return worker_capabilities 

163 

164 

165def partial_bot_properties(properties: Dict[str, Set[str]]) -> List[Dict[str, List[str]]]: 

166 property_pairs = flatten_properties(properties) 

167 

168 if len(property_pairs) > CAPABILITIES_WARNING_THRESHOLD: 

169 LOGGER.warning( 

170 "A worker with a large capabilities dictionary has been connected. " 

171 f"Processing its capabilities may take a while. Capabilities: {property_pairs}" 

172 ) 

173 

174 # Using the itertools powerset recipe, construct the powerset of the tuples 

175 powerset = chain.from_iterable(combinations(property_pairs, r) for r in range(len(property_pairs) + 1)) 

176 return list(map(merge_property_pairs, powerset)) 

177 

178 

179def flatten_properties(properties: Dict[str, Set[str]]) -> List[Tuple[str, str]]: 

180 return [(name, value) for name in sorted(properties) for value in sorted(properties[name])] 

181 

182 

183def merge_property_pairs(property_pairs: Iterable[Tuple[str, str]]) -> Dict[str, List[str]]: 

184 properties: Dict[str, List[str]] = {} 

185 for name, value in property_pairs: 

186 properties.setdefault(name, []).append(value) 

187 return {k: sorted(v) for k, v in properties.items()}