Coverage for /builds/BuildGrid/buildgrid/buildgrid/utils.py: 95.71%

140 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-03-28 16:20 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import hashlib 

17import json 

18import os 

19from functools import partial 

20from io import BytesIO 

21from operator import attrgetter 

22from typing import ( 

23 IO, 

24 AnyStr, 

25 BinaryIO, 

26 Dict, 

27 Generator, 

28 Iterable, 

29 Iterator, 

30 List, 

31 Mapping, 

32 Optional, 

33 Sequence, 

34 Set, 

35 Tuple, 

36 TypeVar, 

37 Union, 

38 cast, 

39) 

40from urllib.parse import urljoin 

41 

42from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

43from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

44from buildgrid.settings import BROWSER_URL_FORMAT, HASH, HASH_LENGTH 

45 

46T = TypeVar("T") 

47 

48 

49secure_uri_schemes = ["https", "grpcs"] 

50insecure_uri_schemes = ["http", "grpc"] 

51 

52 

53class BrowserURL: 

54 __url_markers = ( 

55 "%(instance)s", 

56 "%(type)s", 

57 "%(hash)s", 

58 "%(sizebytes)s", 

59 ) 

60 

61 def __init__(self, base_url: str, instance_name: Optional[str] = None) -> None: 

62 """Begins browser URL helper initialization.""" 

63 self.__base_url = base_url 

64 self.__initialized = False 

65 self.__url_spec = { 

66 "%(instance)s": instance_name or "", 

67 } 

68 

69 def for_message(self, message_type: str, message_digest: Digest) -> bool: 

70 """Completes browser URL initialization for a protobuf message.""" 

71 if self.__initialized: 

72 return False 

73 

74 self.__url_spec["%(type)s"] = message_type 

75 self.__url_spec["%(hash)s"] = message_digest.hash 

76 self.__url_spec["%(sizebytes)s"] = str(message_digest.size_bytes) 

77 

78 self.__initialized = True 

79 return True 

80 

81 def generate(self) -> Optional[Union[str, bytes]]: 

82 """Generates a browser URL string.""" 

83 if not self.__base_url or not self.__initialized: 

84 return None 

85 

86 url_tail = BROWSER_URL_FORMAT 

87 

88 for url_marker in self.__url_markers: 

89 if url_marker not in self.__url_spec: 

90 return None 

91 if url_marker not in url_tail: 

92 continue 

93 url_tail = url_tail.replace(url_marker, self.__url_spec[url_marker]) 

94 

95 return urljoin(self.__base_url, url_tail) 

96 

97 

98def get_hash_type() -> "remote_execution_pb2.DigestFunction.Value.ValueType": 

99 """Returns the hash type.""" 

100 hash_name = HASH().name 

101 if hash_name == "sha256": 

102 return remote_execution_pb2.DigestFunction.SHA256 

103 return remote_execution_pb2.DigestFunction.UNKNOWN 

104 

105 

106def create_digest(bytes_to_digest: bytes) -> remote_execution_pb2.Digest: 

107 """Computes the :obj:`Digest` of a piece of data. 

108 

109 The :obj:`Digest` of a data is a function of its hash **and** size. 

110 

111 Args: 

112 bytes_to_digest (bytes): byte data to digest. 

113 

114 Returns: 

115 :obj:`Digest`: The :obj:`Digest` for the given byte data. 

116 """ 

117 return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(), size_bytes=len(bytes_to_digest)) 

118 

119 

120def create_digest_from_file(file_obj: BinaryIO) -> remote_execution_pb2.Digest: 

121 """Computed the :obj:`Digest` of a file-like object. 

122 

123 The :obj:`Digest` contains a hash of the file's contents and the size of 

124 those contents. This function only reads the content in chunks for hashing, 

125 so is safe to use on large files. 

126 

127 Args: 

128 file_obj (BinaryIO): A file-like object of some kind. 

129 

130 Returns: 

131 :obj:`Digest`: The :obj:`Digest` for the given file object. 

132 """ 

133 digest = remote_execution_pb2.Digest() 

134 

135 # Make sure we're hashing from the start of the file 

136 file_obj.seek(0) 

137 

138 # Generate the file hash and keep track of the file size 

139 hasher = HASH() 

140 digest.size_bytes = 0 

141 for block in iter(partial(file_obj.read, 8192), b""): 

142 hasher.update(block) 

143 digest.size_bytes += len(block) 

144 digest.hash = hasher.hexdigest() 

145 

146 # Return to the start of the file ready for future reads 

147 file_obj.seek(0) 

148 return digest 

149 

150 

151def parse_digest(digest_string: str) -> Optional[remote_execution_pb2.Digest]: 

152 """Creates a :obj:`Digest` from a digest string. 

153 

154 A digest string should alway be: ``{hash}/{size_bytes}``. 

155 

156 Args: 

157 digest_string (str): the digest string. 

158 

159 Returns: 

160 :obj:`Digest`: The :obj:`Digest` read from the string or None if 

161 `digest_string` is not a valid digest string. 

162 """ 

163 digest_hash, digest_size = digest_string.split("/") 

164 

165 if len(digest_hash) == HASH_LENGTH and digest_size.isdigit(): 

166 return remote_execution_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size)) 

167 

168 return None 

169 

170 

171def validate_digest_data(digest: remote_execution_pb2.Digest, data: bytes) -> bool: 

172 """Validate that the given digest corresponds to the given data.""" 

173 return len(data) == digest.size_bytes and HASH(data).hexdigest() == digest.hash 

174 

175 

176def read_file(file_path: str) -> bytes: 

177 """Loads raw file content in memory. 

178 

179 Args: 

180 file_path (str): path to the target file. 

181 

182 Returns: 

183 bytes: Raw file's content until EOF. 

184 

185 Raises: 

186 OSError: If `file_path` does not exist or is not readable. 

187 """ 

188 with open(file_path, "rb") as byte_file: 

189 return byte_file.read() 

190 

191 

192def read_and_rewind(read_head: IO[AnyStr]) -> Optional[AnyStr]: 

193 """Reads from an IO object and returns the data found there 

194 after rewinding the object to the beginning. 

195 

196 Args: 

197 read_head (IO): readable IO head 

198 

199 Returns: 

200 AnyStr: readable content from `read_head`. 

201 """ 

202 if not read_head: 

203 return None 

204 

205 data = read_head.read() 

206 read_head.seek(0) 

207 return data 

208 

209 

210def merkle_tree_maker( 

211 directory_path: str, 

212) -> Iterator[Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str]]: 

213 """Walks a local folder tree, generating :obj:`FileNode` and 

214 :obj:`DirectoryNode`. 

215 

216 Args: 

217 directory_path (str): absolute or relative path to a local directory. 

218 

219 Yields: 

220 :obj:`Message`, bytes, str: a tutple of either a :obj:`FileNode` or 

221 :obj:`DirectoryNode` message, the corresponding blob and the 

222 corresponding node path. 

223 """ 

224 directory_name = os.path.basename(directory_path) 

225 

226 # Actual generator, yields recursively FileNodes and DirectoryNodes: 

227 def __merkle_tree_maker(directory_path: str, directory_name: str) -> Generator[ 

228 Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str], 

229 None, 

230 Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str], 

231 ]: 

232 if not os.path.isabs(directory_path): 

233 directory_path = os.path.abspath(directory_path) 

234 

235 directory = remote_execution_pb2.Directory() 

236 

237 files, directories, symlinks = [], [], [] 

238 for directory_entry in os.scandir(directory_path): 

239 node_name, node_path = directory_entry.name, directory_entry.path 

240 

241 node: Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode] 

242 node_blob: BinaryIO 

243 if directory_entry.is_file(follow_symlinks=False): 

244 with open(directory_entry.path, "rb") as node_blob: 

245 node_digest = create_digest_from_file(node_blob) 

246 

247 node = remote_execution_pb2.FileNode() 

248 node.name = node_name 

249 node.digest.CopyFrom(node_digest) 

250 node.is_executable = os.access(node_path, os.X_OK) 

251 

252 files.append(node) 

253 

254 yield node, node_blob, node_path 

255 

256 elif directory_entry.is_dir(follow_symlinks=False): 

257 node, node_blob, _ = yield from __merkle_tree_maker(node_path, node_name) 

258 

259 directories.append(cast(remote_execution_pb2.DirectoryNode, node)) 

260 

261 yield node, node_blob, node_path 

262 

263 # Create a SymlinkNode; 

264 elif os.path.islink(directory_entry.path): 

265 node_target = os.readlink(directory_entry.path) 

266 

267 symlink_node = remote_execution_pb2.SymlinkNode() 

268 symlink_node.name = directory_entry.name 

269 symlink_node.target = node_target 

270 

271 symlinks.append(symlink_node) 

272 

273 files.sort(key=attrgetter("name")) 

274 directories.sort(key=attrgetter("name")) 

275 symlinks.sort(key=attrgetter("name")) 

276 

277 directory.files.extend(files) 

278 directory.directories.extend(directories) 

279 directory.symlinks.extend(symlinks) 

280 

281 node_data = directory.SerializeToString() 

282 node_digest = create_digest(node_data) 

283 

284 dir_node = remote_execution_pb2.DirectoryNode() 

285 dir_node.name = directory_name 

286 dir_node.digest.CopyFrom(node_digest) 

287 

288 return dir_node, BytesIO(node_data), directory_path 

289 

290 node, node_blob, node_path = yield from __merkle_tree_maker(directory_path, directory_name) 

291 

292 yield node, node_blob, node_path 

293 

294 

295def convert_values_to_sorted_lists( 

296 dictionary: Mapping[str, Union[str, Sequence[str], Set[str]]] 

297) -> Dict[str, List[str]]: 

298 """Given a dictionary, do the following: 

299 

300 1. Turn strings into singleton lists 

301 2. Turn all other sequence types into sorted lists with list() 

302 

303 This returns the converted dictionary and does not change the dictionary 

304 that was passed in. 

305 

306 """ 

307 normalized: Dict[str, List[str]] = {} 

308 for key, value in dictionary.items(): 

309 if isinstance(value, str): 

310 normalized[key] = [value] 

311 else: 

312 try: 

313 normalized[key] = sorted(list(value)) 

314 except TypeError: 

315 raise ValueError(f"{value} cannot be sorted") 

316 return normalized 

317 

318 

319def hash_from_dict(dictionary: Mapping[str, List[str]]) -> str: 

320 """Get the hash represntation of a dictionary""" 

321 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

322 

323 

324def get_unique_objects_by_attribute(objects: Sequence[T], attribute: str) -> Iterable[T]: 

325 """Return a list of unique objects based on a hashable attribute or chained attributes. 

326 

327 Note that this does not provide any sanitization, and any problematic elements will 

328 only raise exceptions when iterated on.""" 

329 

330 attrs_seen = set() 

331 

332 for obj in objects: 

333 if obj: 

334 attr_value = attrgetter(attribute)(obj) 

335 if attr_value not in attrs_seen: 

336 attrs_seen.add(attr_value) 

337 yield obj 

338 

339 

340def retry_delay(retry_attempt: int, delay_base: int = 1) -> float: 

341 attempt = min(5, retry_attempt) # Limit the delay to ~10.5x the base time 

342 return round(delay_base * (1.6**attempt), 1) 

343 

344 

345def flatten_capabilities(capabilities: Mapping[str, Union[Set[str], List[str]]]) -> List[Tuple[str, str]]: 

346 """Flatten a capabilities dictionary. 

347 

348 This method takes a capabilities dictionary and flattens it into a 

349 list of key/value tuples describing all the platform properties 

350 that the capabilities map to. To do this, it assumes that all of the 

351 dictionary's values are iterable. 

352 

353 For example, 

354 

355 ``{'OSFamily': {'Linux'}, 'ISA': {'x86-32', 'x86-64'}}`` 

356 

357 becomes 

358 

359 ``[('OSFamily', 'Linux'), ('ISA', 'x86-32'), ('ISA', 'x86-64')]`` 

360 

361 Args: 

362 capabilities (dict): The capabilities dictionary to flatten. 

363 

364 Returns: 

365 list containing the flattened dictionary key-value tuples. 

366 

367 """ 

368 return [(name, value) for name, value_list in capabilities.items() for value in value_list]