Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 98.41%
63 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-05 15:37 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-05 15:37 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17DiskStorage
18==================
20A CAS storage provider that stores files as blobs on disk.
21"""
23import errno
24import io
25import logging
26import os
27import tempfile
28from typing import IO, Dict, List, Optional
30from buildgrid._exceptions import StorageFullError
31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest
33from .storage_abc import StorageABC
35LOGGER = logging.getLogger(__name__)
38class DiskStorage(StorageABC):
39 def __init__(self, path: str) -> None:
40 if not os.path.isabs(path):
41 self.__root_path = os.path.abspath(path)
42 else:
43 self.__root_path = path
44 self.__cas_path = os.path.join(self.__root_path, "cas")
46 self.objects_path = os.path.join(self.__cas_path, "objects")
47 self.temp_path = os.path.join(self.__root_path, "tmp")
49 os.makedirs(self.objects_path, exist_ok=True)
50 os.makedirs(self.temp_path, exist_ok=True)
52 def has_blob(self, digest: Digest) -> bool:
53 LOGGER.debug(f"Checking for blob: [{digest}]")
54 return os.path.exists(self._get_object_path(digest))
56 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]:
57 LOGGER.debug(f"Getting blob: [{digest}]")
58 try:
59 f = open(self._get_object_path(digest), "rb")
60 # TODO probably need to make StorageABC generic...?
61 return io.BufferedReader(f) # type: ignore[arg-type]
62 except FileNotFoundError:
63 return None
65 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, IO[bytes]]:
66 LOGGER.debug(f"Getting {len(digests)} blobs")
67 blobmap = {}
68 for digest in digests:
69 blob = self.get_blob(digest)
70 if blob is not None:
71 # Bulk read is for a potentially very large number of
72 # small blobs. The total size of all blobs is limited by
73 # the gRPC message size. Immediately read each blob to
74 # avoid hitting open file limits.
75 blobmap[digest.hash] = io.BytesIO(blob.read())
76 blob.close()
77 # TODO probably need to make StorageABC generic...?
78 return blobmap # type: ignore[return-value]
80 def delete_blob(self, digest: Digest) -> None:
81 LOGGER.debug(f"Deleting blob: [{digest}]")
82 try:
83 os.remove(self._get_object_path(digest))
84 except OSError:
85 pass
87 def begin_write(self, digest: Digest) -> IO[bytes]:
88 return tempfile.NamedTemporaryFile("wb", dir=self.temp_path)
90 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None:
91 LOGGER.debug(f"Writing blob: [{digest}]")
92 object_path = self._get_object_path(digest)
94 try:
95 os.makedirs(os.path.dirname(object_path), exist_ok=True)
96 os.link(write_session.name, object_path)
97 except FileExistsError:
98 # Object is already there!
99 pass
100 except OSError as e:
101 # Not enough space error or file too large
102 if e.errno in [errno.ENOSPC, errno.EFBIG]:
103 raise StorageFullError(f"Disk Error: {e.errno}") from e
104 raise e
105 finally:
106 write_session.close()
108 def _get_object_path(self, digest: Digest) -> str:
109 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])