Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/actioncache/caches/action_cache_abc.py: 94.12%
85 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2020 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16from abc import ABC, abstractmethod
17from typing import Any, Optional, Set, TypeVar
19from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, ActionResult, Digest, Tree
20from buildgrid.server.cas.instance import EMPTY_BLOB_DIGEST
21from buildgrid.server.cas.storage.storage_abc import StorageABC
22from buildgrid.server.exceptions import NotFoundError
23from buildgrid.server.logging import buildgrid_logger
24from buildgrid.server.metrics_names import METRIC
25from buildgrid.server.metrics_utils import publish_counter_metric
26from buildgrid.server.utils.digests import HashableDigest
28LOGGER = buildgrid_logger(__name__)
31T = TypeVar("T", bound="ActionCacheABC")
34class ActionCacheABC(ABC):
35 def __init__(self, allow_updates: bool = False, storage: Optional[StorageABC] = None):
36 self._allow_updates = allow_updates
37 self._storage = storage
39 @property
40 def allow_updates(self) -> bool:
41 return self._allow_updates
43 def __enter__(self: T) -> T:
44 self.start()
45 return self
47 def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
48 self.stop()
50 def start(self) -> None:
51 if self._storage is not None:
52 self._storage.start()
54 def stop(self) -> None:
55 if self._storage is not None:
56 self._storage.stop()
57 LOGGER.info("Stopped ActionCache.")
59 @abstractmethod
60 def get_action_result(self, action_digest: Digest) -> ActionResult:
61 raise NotImplementedError()
63 @abstractmethod
64 def update_action_result(self, action_digest: Digest, action_result: ActionResult) -> None:
65 raise NotImplementedError()
67 def referenced_blobs_still_exist(self, action_digest: Digest, action_result: ActionResult) -> bool:
68 if self._referenced_blobs_still_exist(action_digest, action_result):
69 return True
71 publish_counter_metric(METRIC.ACTION_CACHE.INVALID_CACHE_COUNT, 1)
72 return False
74 def _referenced_blobs_still_exist(self, action_digest: Digest, action_result: ActionResult) -> bool:
75 """Checks CAS for Action and ActionResult output blobs existence.
77 Args:
78 action_digest (Digest): Digest for the Action whose top level digests
79 will be searched for.
80 action_result (ActionResult): ActionResult to search referenced
81 output blobs for.
83 Returns:
84 True if all referenced blobs are present in CAS, False otherwise.
85 """
86 if not self._storage:
87 return True
88 blobs_needed: Set[HashableDigest] = set()
90 for output_file in action_result.output_files:
91 blobs_needed.add(HashableDigest(output_file.digest.hash, output_file.digest.size_bytes))
93 for output_directory in action_result.output_directories:
94 if output_directory.HasField("tree_digest"):
95 blobs_needed.add(
96 HashableDigest(output_directory.tree_digest.hash, output_directory.tree_digest.size_bytes)
97 )
98 tree = self._storage.get_message(output_directory.tree_digest, Tree)
99 if tree is None:
100 return False
102 for file_node in tree.root.files:
103 blobs_needed.add(HashableDigest(file_node.digest.hash, file_node.digest.size_bytes))
105 for child in tree.children:
106 for file_node in child.files:
107 blobs_needed.add(HashableDigest(file_node.digest.hash, file_node.digest.size_bytes))
108 elif output_directory.HasField("root_directory_digest"):
109 blobs_needed.add(
110 HashableDigest(
111 output_directory.root_directory_digest.hash, output_directory.root_directory_digest.size_bytes
112 )
113 )
114 try:
115 for directory in self._storage.get_tree(
116 output_directory.root_directory_digest, raise_on_missing_subdir=True
117 ):
118 blobs_needed.update(
119 [
120 HashableDigest(file_node.digest.hash, file_node.digest.size_bytes)
121 for file_node in directory.files
122 ]
123 )
124 blobs_needed.update(
125 [
126 HashableDigest(dir_node.digest.hash, dir_node.digest.size_bytes)
127 for dir_node in directory.directories
128 ]
129 )
130 except NotFoundError:
131 return False
133 if action_result.stdout_digest.hash and not action_result.stdout_raw:
134 blobs_needed.add(HashableDigest(action_result.stdout_digest.hash, action_result.stdout_digest.size_bytes))
136 if action_result.stderr_digest.hash and not action_result.stderr_raw:
137 blobs_needed.add(HashableDigest(action_result.stderr_digest.hash, action_result.stderr_digest.size_bytes))
139 # Additionally refresh the TTL of the ActionDigest and the top level digests
140 # contained within. This will keep the Action around for use cases like bgd-browser
141 # where you want to look at both the Action and it's ActionResult, but with minimal
142 # overhead.
143 action = self._storage.get_message(action_digest, Action)
144 action_blobs: Set[HashableDigest] = set()
145 if action:
146 action_blobs.add(HashableDigest(action_digest.hash, action_digest.size_bytes))
147 action_blobs.add(HashableDigest(action.command_digest.hash, action.command_digest.size_bytes))
148 action_blobs.add(HashableDigest(action.input_root_digest.hash, action.input_root_digest.size_bytes))
150 blobs_to_check: Set[HashableDigest] = blobs_needed | action_blobs
151 # No need to check the underlying storage for the empty blob as it is a special case blob which always exists
152 # It is possible that the empty blob is not actually present in the underlying storage
153 blobs_to_check.discard(HashableDigest(EMPTY_BLOB_DIGEST.hash, EMPTY_BLOB_DIGEST.size_bytes))
154 missing = self._storage.missing_blobs([blob.to_digest() for blob in blobs_to_check])
155 required_missing = [blob for blob in missing if HashableDigest(blob.hash, blob.size_bytes) in blobs_needed]
156 if len(required_missing) != 0:
157 LOGGER.debug(
158 "Missing blobs.", tags=dict(required_missing=len(required_missing), blobs_needed=len(blobs_needed))
159 )
160 return False
161 return True