Coverage for /builds/BuildGrid/buildgrid/buildgrid/utils.py: 95.92%

147 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-06-11 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16import hashlib 

17import json 

18import os 

19from dataclasses import dataclass 

20from functools import partial 

21from io import BytesIO 

22from operator import attrgetter 

23from typing import ( 

24 IO, 

25 AnyStr, 

26 BinaryIO, 

27 Dict, 

28 Generator, 

29 Iterable, 

30 Iterator, 

31 List, 

32 Mapping, 

33 Optional, 

34 Sequence, 

35 Set, 

36 Tuple, 

37 TypeVar, 

38 Union, 

39 cast, 

40) 

41from urllib.parse import urljoin 

42 

43from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2 

44from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

45from buildgrid.settings import BROWSER_URL_FORMAT, HASH, HASH_LENGTH 

46 

47T = TypeVar("T") 

48 

49 

50secure_uri_schemes = ["https", "grpcs"] 

51insecure_uri_schemes = ["http", "grpc"] 

52 

53 

54class BrowserURL: 

55 __url_markers = ( 

56 "%(instance)s", 

57 "%(type)s", 

58 "%(hash)s", 

59 "%(sizebytes)s", 

60 ) 

61 

62 def __init__(self, base_url: str, instance_name: Optional[str] = None) -> None: 

63 """Begins browser URL helper initialization.""" 

64 self.__base_url = base_url 

65 self.__initialized = False 

66 self.__url_spec = { 

67 "%(instance)s": instance_name or "", 

68 } 

69 

70 def for_message(self, message_type: str, message_digest: Digest) -> bool: 

71 """Completes browser URL initialization for a protobuf message.""" 

72 if self.__initialized: 

73 return False 

74 

75 self.__url_spec["%(type)s"] = message_type 

76 self.__url_spec["%(hash)s"] = message_digest.hash 

77 self.__url_spec["%(sizebytes)s"] = str(message_digest.size_bytes) 

78 

79 self.__initialized = True 

80 return True 

81 

82 def generate(self) -> Optional[Union[str, bytes]]: 

83 """Generates a browser URL string.""" 

84 if not self.__base_url or not self.__initialized: 

85 return None 

86 

87 url_tail = BROWSER_URL_FORMAT 

88 

89 for url_marker in self.__url_markers: 

90 if url_marker not in self.__url_spec: 

91 return None 

92 if url_marker not in url_tail: 

93 continue 

94 url_tail = url_tail.replace(url_marker, self.__url_spec[url_marker]) 

95 

96 return urljoin(self.__base_url, url_tail) 

97 

98 

99@dataclass(frozen=True) 

100class HashableDigest: 

101 hash: str 

102 size_bytes: int 

103 

104 def to_digest(self) -> Digest: 

105 return Digest(hash=self.hash, size_bytes=self.size_bytes) 

106 

107 

108def get_hash_type() -> "remote_execution_pb2.DigestFunction.Value.ValueType": 

109 """Returns the hash type.""" 

110 hash_name = HASH().name 

111 if hash_name == "sha256": 

112 return remote_execution_pb2.DigestFunction.SHA256 

113 return remote_execution_pb2.DigestFunction.UNKNOWN 

114 

115 

116def create_digest(bytes_to_digest: bytes) -> remote_execution_pb2.Digest: 

117 """Computes the :obj:`Digest` of a piece of data. 

118 

119 The :obj:`Digest` of a data is a function of its hash **and** size. 

120 

121 Args: 

122 bytes_to_digest (bytes): byte data to digest. 

123 

124 Returns: 

125 :obj:`Digest`: The :obj:`Digest` for the given byte data. 

126 """ 

127 return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(), size_bytes=len(bytes_to_digest)) 

128 

129 

130def create_digest_from_file(file_obj: BinaryIO) -> remote_execution_pb2.Digest: 

131 """Computed the :obj:`Digest` of a file-like object. 

132 

133 The :obj:`Digest` contains a hash of the file's contents and the size of 

134 those contents. This function only reads the content in chunks for hashing, 

135 so is safe to use on large files. 

136 

137 Args: 

138 file_obj (BinaryIO): A file-like object of some kind. 

139 

140 Returns: 

141 :obj:`Digest`: The :obj:`Digest` for the given file object. 

142 """ 

143 digest = remote_execution_pb2.Digest() 

144 

145 # Make sure we're hashing from the start of the file 

146 file_obj.seek(0) 

147 

148 # Generate the file hash and keep track of the file size 

149 hasher = HASH() 

150 digest.size_bytes = 0 

151 for block in iter(partial(file_obj.read, 8192), b""): 

152 hasher.update(block) 

153 digest.size_bytes += len(block) 

154 digest.hash = hasher.hexdigest() 

155 

156 # Return to the start of the file ready for future reads 

157 file_obj.seek(0) 

158 return digest 

159 

160 

161def parse_digest(digest_string: str) -> Optional[remote_execution_pb2.Digest]: 

162 """Creates a :obj:`Digest` from a digest string. 

163 

164 A digest string should alway be: ``{hash}/{size_bytes}``. 

165 

166 Args: 

167 digest_string (str): the digest string. 

168 

169 Returns: 

170 :obj:`Digest`: The :obj:`Digest` read from the string or None if 

171 `digest_string` is not a valid digest string. 

172 """ 

173 digest_hash, digest_size = digest_string.split("/") 

174 

175 if len(digest_hash) == HASH_LENGTH and digest_size.isdigit(): 

176 return remote_execution_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size)) 

177 

178 return None 

179 

180 

181def validate_digest_data(digest: remote_execution_pb2.Digest, data: bytes) -> bool: 

182 """Validate that the given digest corresponds to the given data.""" 

183 return len(data) == digest.size_bytes and HASH(data).hexdigest() == digest.hash 

184 

185 

186def read_file(file_path: str) -> bytes: 

187 """Loads raw file content in memory. 

188 

189 Args: 

190 file_path (str): path to the target file. 

191 

192 Returns: 

193 bytes: Raw file's content until EOF. 

194 

195 Raises: 

196 OSError: If `file_path` does not exist or is not readable. 

197 """ 

198 with open(file_path, "rb") as byte_file: 

199 return byte_file.read() 

200 

201 

202def read_and_rewind(read_head: IO[AnyStr]) -> Optional[AnyStr]: 

203 """Reads from an IO object and returns the data found there 

204 after rewinding the object to the beginning. 

205 

206 Args: 

207 read_head (IO): readable IO head 

208 

209 Returns: 

210 AnyStr: readable content from `read_head`. 

211 """ 

212 if not read_head: 

213 return None 

214 

215 data = read_head.read() 

216 read_head.seek(0) 

217 return data 

218 

219 

220def merkle_tree_maker( 

221 directory_path: str, 

222) -> Iterator[Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str]]: 

223 """Walks a local folder tree, generating :obj:`FileNode` and 

224 :obj:`DirectoryNode`. 

225 

226 Args: 

227 directory_path (str): absolute or relative path to a local directory. 

228 

229 Yields: 

230 :obj:`Message`, bytes, str: a tutple of either a :obj:`FileNode` or 

231 :obj:`DirectoryNode` message, the corresponding blob and the 

232 corresponding node path. 

233 """ 

234 directory_name = os.path.basename(directory_path) 

235 

236 # Actual generator, yields recursively FileNodes and DirectoryNodes: 

237 def __merkle_tree_maker(directory_path: str, directory_name: str) -> Generator[ 

238 Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str], 

239 None, 

240 Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str], 

241 ]: 

242 if not os.path.isabs(directory_path): 

243 directory_path = os.path.abspath(directory_path) 

244 

245 directory = remote_execution_pb2.Directory() 

246 

247 files, directories, symlinks = [], [], [] 

248 for directory_entry in os.scandir(directory_path): 

249 node_name, node_path = directory_entry.name, directory_entry.path 

250 

251 node: Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode] 

252 node_blob: BinaryIO 

253 if directory_entry.is_file(follow_symlinks=False): 

254 with open(directory_entry.path, "rb") as node_blob: 

255 node_digest = create_digest_from_file(node_blob) 

256 

257 node = remote_execution_pb2.FileNode() 

258 node.name = node_name 

259 node.digest.CopyFrom(node_digest) 

260 node.is_executable = os.access(node_path, os.X_OK) 

261 

262 files.append(node) 

263 

264 yield node, node_blob, node_path 

265 

266 elif directory_entry.is_dir(follow_symlinks=False): 

267 node, node_blob, _ = yield from __merkle_tree_maker(node_path, node_name) 

268 

269 directories.append(cast(remote_execution_pb2.DirectoryNode, node)) 

270 

271 yield node, node_blob, node_path 

272 

273 # Create a SymlinkNode; 

274 elif os.path.islink(directory_entry.path): 

275 node_target = os.readlink(directory_entry.path) 

276 

277 symlink_node = remote_execution_pb2.SymlinkNode() 

278 symlink_node.name = directory_entry.name 

279 symlink_node.target = node_target 

280 

281 symlinks.append(symlink_node) 

282 

283 files.sort(key=attrgetter("name")) 

284 directories.sort(key=attrgetter("name")) 

285 symlinks.sort(key=attrgetter("name")) 

286 

287 directory.files.extend(files) 

288 directory.directories.extend(directories) 

289 directory.symlinks.extend(symlinks) 

290 

291 node_data = directory.SerializeToString() 

292 node_digest = create_digest(node_data) 

293 

294 dir_node = remote_execution_pb2.DirectoryNode() 

295 dir_node.name = directory_name 

296 dir_node.digest.CopyFrom(node_digest) 

297 

298 return dir_node, BytesIO(node_data), directory_path 

299 

300 node, node_blob, node_path = yield from __merkle_tree_maker(directory_path, directory_name) 

301 

302 yield node, node_blob, node_path 

303 

304 

305def convert_values_to_sorted_lists( 

306 dictionary: Mapping[str, Union[str, Sequence[str], Set[str]]] 

307) -> Dict[str, List[str]]: 

308 """Given a dictionary, do the following: 

309 

310 1. Turn strings into singleton lists 

311 2. Turn all other sequence types into sorted lists with list() 

312 

313 This returns the converted dictionary and does not change the dictionary 

314 that was passed in. 

315 

316 """ 

317 normalized: Dict[str, List[str]] = {} 

318 for key, value in dictionary.items(): 

319 if isinstance(value, str): 

320 normalized[key] = [value] 

321 else: 

322 try: 

323 normalized[key] = sorted(list(value)) 

324 except TypeError: 

325 raise ValueError(f"{value} cannot be sorted") 

326 return normalized 

327 

328 

329def hash_from_dict(dictionary: Mapping[str, List[str]]) -> str: 

330 """Get the hash represntation of a dictionary""" 

331 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest() 

332 

333 

334def get_unique_objects_by_attribute(objects: Sequence[T], attribute: str) -> Iterable[T]: 

335 """Return a list of unique objects based on a hashable attribute or chained attributes. 

336 

337 Note that this does not provide any sanitization, and any problematic elements will 

338 only raise exceptions when iterated on.""" 

339 

340 attrs_seen = set() 

341 

342 for obj in objects: 

343 if obj: 

344 attr_value = attrgetter(attribute)(obj) 

345 if attr_value not in attrs_seen: 

346 attrs_seen.add(attr_value) 

347 yield obj 

348 

349 

350def retry_delay(retry_attempt: int, delay_base: int = 1) -> float: 

351 attempt = min(5, retry_attempt) # Limit the delay to ~10.5x the base time 

352 return round(delay_base * (1.6**attempt), 1) 

353 

354 

355def flatten_capabilities(capabilities: Mapping[str, Union[Set[str], List[str]]]) -> List[Tuple[str, str]]: 

356 """Flatten a capabilities dictionary. 

357 

358 This method takes a capabilities dictionary and flattens it into a 

359 list of key/value tuples describing all the platform properties 

360 that the capabilities map to. To do this, it assumes that all of the 

361 dictionary's values are iterable. 

362 

363 For example, 

364 

365 ``{'OSFamily': {'Linux'}, 'ISA': {'x86-32', 'x86-64'}}`` 

366 

367 becomes 

368 

369 ``[('OSFamily', 'Linux'), ('ISA', 'x86-32'), ('ISA', 'x86-64')]`` 

370 

371 Args: 

372 capabilities (dict): The capabilities dictionary to flatten. 

373 

374 Returns: 

375 list containing the flattened dictionary key-value tuples. 

376 

377 """ 

378 return [(name, value) for name, value_list in capabilities.items() for value in value_list]