Coverage for /builds/BuildGrid/buildgrid/buildgrid/utils.py: 95.71%
140 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-03-28 16:20 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16import hashlib
17import json
18import os
19from functools import partial
20from io import BytesIO
21from operator import attrgetter
22from typing import (
23 IO,
24 AnyStr,
25 BinaryIO,
26 Dict,
27 Generator,
28 Iterable,
29 Iterator,
30 List,
31 Mapping,
32 Optional,
33 Sequence,
34 Set,
35 Tuple,
36 TypeVar,
37 Union,
38 cast,
39)
40from urllib.parse import urljoin
42from buildgrid._protos.build.bazel.remote.execution.v2 import remote_execution_pb2
43from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
44from buildgrid.settings import BROWSER_URL_FORMAT, HASH, HASH_LENGTH
46T = TypeVar("T")
49secure_uri_schemes = ["https", "grpcs"]
50insecure_uri_schemes = ["http", "grpc"]
53class BrowserURL:
54 __url_markers = (
55 "%(instance)s",
56 "%(type)s",
57 "%(hash)s",
58 "%(sizebytes)s",
59 )
61 def __init__(self, base_url: str, instance_name: Optional[str] = None) -> None:
62 """Begins browser URL helper initialization."""
63 self.__base_url = base_url
64 self.__initialized = False
65 self.__url_spec = {
66 "%(instance)s": instance_name or "",
67 }
69 def for_message(self, message_type: str, message_digest: Digest) -> bool:
70 """Completes browser URL initialization for a protobuf message."""
71 if self.__initialized:
72 return False
74 self.__url_spec["%(type)s"] = message_type
75 self.__url_spec["%(hash)s"] = message_digest.hash
76 self.__url_spec["%(sizebytes)s"] = str(message_digest.size_bytes)
78 self.__initialized = True
79 return True
81 def generate(self) -> Optional[Union[str, bytes]]:
82 """Generates a browser URL string."""
83 if not self.__base_url or not self.__initialized:
84 return None
86 url_tail = BROWSER_URL_FORMAT
88 for url_marker in self.__url_markers:
89 if url_marker not in self.__url_spec:
90 return None
91 if url_marker not in url_tail:
92 continue
93 url_tail = url_tail.replace(url_marker, self.__url_spec[url_marker])
95 return urljoin(self.__base_url, url_tail)
98def get_hash_type() -> "remote_execution_pb2.DigestFunction.Value.ValueType":
99 """Returns the hash type."""
100 hash_name = HASH().name
101 if hash_name == "sha256":
102 return remote_execution_pb2.DigestFunction.SHA256
103 return remote_execution_pb2.DigestFunction.UNKNOWN
106def create_digest(bytes_to_digest: bytes) -> remote_execution_pb2.Digest:
107 """Computes the :obj:`Digest` of a piece of data.
109 The :obj:`Digest` of a data is a function of its hash **and** size.
111 Args:
112 bytes_to_digest (bytes): byte data to digest.
114 Returns:
115 :obj:`Digest`: The :obj:`Digest` for the given byte data.
116 """
117 return remote_execution_pb2.Digest(hash=HASH(bytes_to_digest).hexdigest(), size_bytes=len(bytes_to_digest))
120def create_digest_from_file(file_obj: BinaryIO) -> remote_execution_pb2.Digest:
121 """Computed the :obj:`Digest` of a file-like object.
123 The :obj:`Digest` contains a hash of the file's contents and the size of
124 those contents. This function only reads the content in chunks for hashing,
125 so is safe to use on large files.
127 Args:
128 file_obj (BinaryIO): A file-like object of some kind.
130 Returns:
131 :obj:`Digest`: The :obj:`Digest` for the given file object.
132 """
133 digest = remote_execution_pb2.Digest()
135 # Make sure we're hashing from the start of the file
136 file_obj.seek(0)
138 # Generate the file hash and keep track of the file size
139 hasher = HASH()
140 digest.size_bytes = 0
141 for block in iter(partial(file_obj.read, 8192), b""):
142 hasher.update(block)
143 digest.size_bytes += len(block)
144 digest.hash = hasher.hexdigest()
146 # Return to the start of the file ready for future reads
147 file_obj.seek(0)
148 return digest
151def parse_digest(digest_string: str) -> Optional[remote_execution_pb2.Digest]:
152 """Creates a :obj:`Digest` from a digest string.
154 A digest string should alway be: ``{hash}/{size_bytes}``.
156 Args:
157 digest_string (str): the digest string.
159 Returns:
160 :obj:`Digest`: The :obj:`Digest` read from the string or None if
161 `digest_string` is not a valid digest string.
162 """
163 digest_hash, digest_size = digest_string.split("/")
165 if len(digest_hash) == HASH_LENGTH and digest_size.isdigit():
166 return remote_execution_pb2.Digest(hash=digest_hash, size_bytes=int(digest_size))
168 return None
171def validate_digest_data(digest: remote_execution_pb2.Digest, data: bytes) -> bool:
172 """Validate that the given digest corresponds to the given data."""
173 return len(data) == digest.size_bytes and HASH(data).hexdigest() == digest.hash
176def read_file(file_path: str) -> bytes:
177 """Loads raw file content in memory.
179 Args:
180 file_path (str): path to the target file.
182 Returns:
183 bytes: Raw file's content until EOF.
185 Raises:
186 OSError: If `file_path` does not exist or is not readable.
187 """
188 with open(file_path, "rb") as byte_file:
189 return byte_file.read()
192def read_and_rewind(read_head: IO[AnyStr]) -> Optional[AnyStr]:
193 """Reads from an IO object and returns the data found there
194 after rewinding the object to the beginning.
196 Args:
197 read_head (IO): readable IO head
199 Returns:
200 AnyStr: readable content from `read_head`.
201 """
202 if not read_head:
203 return None
205 data = read_head.read()
206 read_head.seek(0)
207 return data
210def merkle_tree_maker(
211 directory_path: str,
212) -> Iterator[Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str]]:
213 """Walks a local folder tree, generating :obj:`FileNode` and
214 :obj:`DirectoryNode`.
216 Args:
217 directory_path (str): absolute or relative path to a local directory.
219 Yields:
220 :obj:`Message`, bytes, str: a tutple of either a :obj:`FileNode` or
221 :obj:`DirectoryNode` message, the corresponding blob and the
222 corresponding node path.
223 """
224 directory_name = os.path.basename(directory_path)
226 # Actual generator, yields recursively FileNodes and DirectoryNodes:
227 def __merkle_tree_maker(directory_path: str, directory_name: str) -> Generator[
228 Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str],
229 None,
230 Tuple[Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode], BinaryIO, str],
231 ]:
232 if not os.path.isabs(directory_path):
233 directory_path = os.path.abspath(directory_path)
235 directory = remote_execution_pb2.Directory()
237 files, directories, symlinks = [], [], []
238 for directory_entry in os.scandir(directory_path):
239 node_name, node_path = directory_entry.name, directory_entry.path
241 node: Union[remote_execution_pb2.FileNode, remote_execution_pb2.DirectoryNode]
242 node_blob: BinaryIO
243 if directory_entry.is_file(follow_symlinks=False):
244 with open(directory_entry.path, "rb") as node_blob:
245 node_digest = create_digest_from_file(node_blob)
247 node = remote_execution_pb2.FileNode()
248 node.name = node_name
249 node.digest.CopyFrom(node_digest)
250 node.is_executable = os.access(node_path, os.X_OK)
252 files.append(node)
254 yield node, node_blob, node_path
256 elif directory_entry.is_dir(follow_symlinks=False):
257 node, node_blob, _ = yield from __merkle_tree_maker(node_path, node_name)
259 directories.append(cast(remote_execution_pb2.DirectoryNode, node))
261 yield node, node_blob, node_path
263 # Create a SymlinkNode;
264 elif os.path.islink(directory_entry.path):
265 node_target = os.readlink(directory_entry.path)
267 symlink_node = remote_execution_pb2.SymlinkNode()
268 symlink_node.name = directory_entry.name
269 symlink_node.target = node_target
271 symlinks.append(symlink_node)
273 files.sort(key=attrgetter("name"))
274 directories.sort(key=attrgetter("name"))
275 symlinks.sort(key=attrgetter("name"))
277 directory.files.extend(files)
278 directory.directories.extend(directories)
279 directory.symlinks.extend(symlinks)
281 node_data = directory.SerializeToString()
282 node_digest = create_digest(node_data)
284 dir_node = remote_execution_pb2.DirectoryNode()
285 dir_node.name = directory_name
286 dir_node.digest.CopyFrom(node_digest)
288 return dir_node, BytesIO(node_data), directory_path
290 node, node_blob, node_path = yield from __merkle_tree_maker(directory_path, directory_name)
292 yield node, node_blob, node_path
295def convert_values_to_sorted_lists(
296 dictionary: Mapping[str, Union[str, Sequence[str], Set[str]]]
297) -> Dict[str, List[str]]:
298 """Given a dictionary, do the following:
300 1. Turn strings into singleton lists
301 2. Turn all other sequence types into sorted lists with list()
303 This returns the converted dictionary and does not change the dictionary
304 that was passed in.
306 """
307 normalized: Dict[str, List[str]] = {}
308 for key, value in dictionary.items():
309 if isinstance(value, str):
310 normalized[key] = [value]
311 else:
312 try:
313 normalized[key] = sorted(list(value))
314 except TypeError:
315 raise ValueError(f"{value} cannot be sorted")
316 return normalized
319def hash_from_dict(dictionary: Mapping[str, List[str]]) -> str:
320 """Get the hash represntation of a dictionary"""
321 return hashlib.sha1(json.dumps(dictionary, sort_keys=True).encode()).hexdigest()
324def get_unique_objects_by_attribute(objects: Sequence[T], attribute: str) -> Iterable[T]:
325 """Return a list of unique objects based on a hashable attribute or chained attributes.
327 Note that this does not provide any sanitization, and any problematic elements will
328 only raise exceptions when iterated on."""
330 attrs_seen = set()
332 for obj in objects:
333 if obj:
334 attr_value = attrgetter(attribute)(obj)
335 if attr_value not in attrs_seen:
336 attrs_seen.add(attr_value)
337 yield obj
340def retry_delay(retry_attempt: int, delay_base: int = 1) -> float:
341 attempt = min(5, retry_attempt) # Limit the delay to ~10.5x the base time
342 return round(delay_base * (1.6**attempt), 1)
345def flatten_capabilities(capabilities: Mapping[str, Union[Set[str], List[str]]]) -> List[Tuple[str, str]]:
346 """Flatten a capabilities dictionary.
348 This method takes a capabilities dictionary and flattens it into a
349 list of key/value tuples describing all the platform properties
350 that the capabilities map to. To do this, it assumes that all of the
351 dictionary's values are iterable.
353 For example,
355 ``{'OSFamily': {'Linux'}, 'ISA': {'x86-32', 'x86-64'}}``
357 becomes
359 ``[('OSFamily', 'Linux'), ('ISA', 'x86-32'), ('ISA', 'x86-64')]``
361 Args:
362 capabilities (dict): The capabilities dictionary to flatten.
364 Returns:
365 list containing the flattened dictionary key-value tuples.
367 """
368 return [(name, value) for name, value_list in capabilities.items() for value in value_list]