Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 98.46%
65 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-04-15 14:01 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17DiskStorage
18==================
20A CAS storage provider that stores files as blobs on disk.
21"""
23import errno
24import io
25import logging
26import os
27import tempfile
28from typing import IO, Dict, List, Optional
30from buildgrid._exceptions import StorageFullError
31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
32from buildgrid.settings import MAX_IN_MEMORY_BLOB_SIZE_BYTES
34from .storage_abc import StorageABC
36LOGGER = logging.getLogger(__name__)
39class DiskStorage(StorageABC):
40 def __init__(self, path: str) -> None:
41 if not os.path.isabs(path):
42 self.__root_path = os.path.abspath(path)
43 else:
44 self.__root_path = path
45 self.__cas_path = os.path.join(self.__root_path, "cas")
47 self.objects_path = os.path.join(self.__cas_path, "objects")
48 self.temp_path = os.path.join(self.__root_path, "tmp")
50 os.makedirs(self.objects_path, exist_ok=True)
51 os.makedirs(self.temp_path, exist_ok=True)
53 def has_blob(self, digest: Digest) -> bool:
54 LOGGER.debug(f"Checking for blob: [{digest}]")
55 return os.path.exists(self._get_object_path(digest))
57 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
58 LOGGER.debug(f"Getting blob: [{digest}]")
59 try:
60 f = open(self._get_object_path(digest), "rb")
61 # TODO probably need to make StorageABC generic...?
62 return io.BufferedReader(f) # type: ignore[arg-type]
63 except FileNotFoundError:
64 return None
66 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]:
67 LOGGER.debug(f"Getting {len(digests)} blobs")
68 blobmap: Dict[str, bytes] = {}
69 for digest in digests:
70 blob = self.get_blob(digest)
71 if blob is not None:
72 with blob:
73 blobmap[digest.hash] = blob.read()
74 return blobmap
76 def delete_blob(self, digest: Digest) -> None:
77 LOGGER.debug(f"Deleting blob: [{digest}]")
78 try:
79 os.remove(self._get_object_path(digest))
80 except OSError:
81 pass
83 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
84 LOGGER.debug(f"Writing blob: [{digest}]")
85 object_path = self._get_object_path(digest)
87 write_session.seek(0)
88 try:
89 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f:
90 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES):
91 f.write(data)
92 os.makedirs(os.path.dirname(object_path), exist_ok=True)
93 os.link(f.name, object_path)
94 except FileExistsError:
95 # Object is already there!
96 pass
97 except OSError as e:
98 # Not enough space error or file too large
99 if e.errno in [errno.ENOSPC, errno.EFBIG]:
100 raise StorageFullError(f"Disk Error: {e.errno}") from e
101 raise e
103 def _get_object_path(self, digest: Digest) -> str:
104 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])