Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 91.96%
112 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17DiskStorage
18==================
20A CAS storage provider that stores files as blobs on disk.
21"""
23import errno
24import io
25import os
26import tempfile
27from typing import IO, Dict, List, Optional, Tuple
29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
30from buildgrid._protos.google.rpc import code_pb2
31from buildgrid._protos.google.rpc.status_pb2 import Status
32from buildgrid.server.decorators import timed
33from buildgrid.server.exceptions import StorageFullError
34from buildgrid.server.logging import buildgrid_logger
35from buildgrid.server.metrics_names import METRIC
36from buildgrid.server.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES
38from .storage_abc import StorageABC
40LOGGER = buildgrid_logger(__name__)
43class DiskStorage(StorageABC):
44 TYPE = "Disk"
46 def __init__(self, path: str) -> None:
47 if not os.path.isabs(path):
48 self.__root_path = os.path.abspath(path)
49 else:
50 self.__root_path = path
51 self.__cas_path = os.path.join(self.__root_path, "cas")
53 self.objects_path = os.path.join(self.__cas_path, "objects")
54 self.temp_path = os.path.join(self.__root_path, "tmp")
56 os.makedirs(self.objects_path, exist_ok=True)
57 os.makedirs(self.temp_path, exist_ok=True)
59 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE)
60 def has_blob(self, digest: Digest) -> bool:
61 LOGGER.debug("Checking for blob.", tags=dict(digest=digest))
62 return os.path.exists(self._get_object_path(digest))
64 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE)
65 def missing_blobs(self, digests: List[Digest]) -> List[Digest]:
66 return [digest for digest in digests if not os.path.exists(self._get_object_path(digest))]
68 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE)
69 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
70 LOGGER.debug("Getting blob.", tags=dict(digest=digest))
71 try:
72 f = open(self._get_object_path(digest), "rb")
73 # TODO probably need to make StorageABC generic...?
74 return io.BufferedReader(f) # type: ignore[arg-type]
75 except FileNotFoundError:
76 return None
78 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE)
79 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
80 result = {}
81 for digest in digests:
82 try:
83 with open(self._get_object_path(digest), "rb") as f:
84 result[digest.hash] = f.read()
85 except FileNotFoundError:
86 # Ignore files not found, will be reported as NOT_FOUND higher up.
87 pass
88 return result
90 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE)
91 def delete_blob(self, digest: Digest) -> None:
92 LOGGER.debug("Deleting blob.", tags=dict(digest=digest))
93 try:
94 os.remove(self._get_object_path(digest))
95 except OSError:
96 pass
98 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE)
99 def bulk_delete(self, digests: List[Digest]) -> List[str]:
100 failed_deletions = []
101 for digest in digests:
102 try:
103 os.remove(self._get_object_path(digest))
104 except FileNotFoundError:
105 # Ignore files not found. Already deleted.
106 pass
107 except OSError:
108 # If deletion threw an exception, assume deletion failed. More specific implementations
109 # with more information can return if a blob was missing instead
110 LOGGER.warning("Unable to clean up digest.", tags=dict(digest=digest), exc_info=True)
111 failed_deletions.append(digest.hash)
113 return failed_deletions
115 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE)
116 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
117 LOGGER.debug("Writing blob.", tags=dict(digest=digest))
118 object_path = self._get_object_path(digest)
120 write_session.seek(0)
121 try:
122 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f:
123 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES):
124 f.write(data)
125 os.makedirs(os.path.dirname(object_path), exist_ok=True)
126 os.link(f.name, object_path)
127 except FileExistsError:
128 # Object is already there!
129 pass
130 except OSError as e:
131 # Not enough space error or file too large
132 if e.errno in [errno.ENOSPC, errno.EFBIG]:
133 raise StorageFullError(f"Disk Error: {e.errno}") from e
134 raise e
136 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE)
137 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]:
138 result = []
139 for digest, data in blobs:
140 object_path = self._get_object_path(digest)
141 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash:
142 result.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash"))
143 continue
144 try:
145 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f:
146 f.write(data)
147 os.makedirs(os.path.dirname(object_path), exist_ok=True)
148 os.link(f.name, object_path)
149 result.append(Status(code=code_pb2.OK))
150 except FileExistsError:
151 # Object is already there!
152 result.append(Status(code=code_pb2.OK))
153 except OSError as e:
154 code = code_pb2.INTERNAL
155 if e.errno in [errno.ENOSPC, errno.EFBIG]:
156 code = code_pb2.RESOURCE_EXHAUSTED
157 result.append(Status(code=code, message=f"Disk Error: {e}"))
158 return result
160 def _get_object_path(self, digest: Digest) -> str:
161 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])