Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 98.36%
61 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2018 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
16"""
17DiskStorage
18==================
20A CAS storage provider that stores files as blobs on disk.
21"""
23import errno
24import logging
25import os
26import tempfile
27import io
29from buildgrid._exceptions import StorageFullError
30from .storage_abc import StorageABC
33class DiskStorage(StorageABC):
35 def __init__(self, path):
36 self.__logger = logging.getLogger(__name__)
38 if not os.path.isabs(path):
39 self.__root_path = os.path.abspath(path)
40 else:
41 self.__root_path = path
42 self.__cas_path = os.path.join(self.__root_path, 'cas')
44 self.objects_path = os.path.join(self.__cas_path, 'objects')
45 self.temp_path = os.path.join(self.__root_path, 'tmp')
47 os.makedirs(self.objects_path, exist_ok=True)
48 os.makedirs(self.temp_path, exist_ok=True)
50 def has_blob(self, digest):
51 self.__logger.debug(f"Checking for blob: [{digest}]")
52 return os.path.exists(self._get_object_path(digest))
54 def get_blob(self, digest):
55 self.__logger.debug(f"Getting blob: [{digest}]")
56 try:
57 # pylint: disable=consider-using-with
58 f = open(self._get_object_path(digest), 'rb')
59 return io.BufferedReader(f)
60 except FileNotFoundError:
61 return None
63 def bulk_read_blobs(self, digests):
64 self.__logger.debug(f"Getting {len(digests)} blobs")
65 blobmap = {}
66 for digest in digests:
67 blob = self.get_blob(digest)
68 if blob is not None:
69 # Bulk read is for a potentially very large number of
70 # small blobs. The total size of all blobs is limited by
71 # the gRPC message size. Immediately read each blob to
72 # avoid hitting open file limits.
73 blobmap[digest.hash] = io.BytesIO(blob.read())
74 blob.close()
75 return blobmap
77 def delete_blob(self, digest):
78 self.__logger.debug(f"Deleting blob: [{digest}]")
79 try:
80 os.remove(self._get_object_path(digest))
81 except OSError:
82 pass
84 def begin_write(self, digest):
85 # pylint: disable=consider-using-with
86 return tempfile.NamedTemporaryFile("wb", dir=self.temp_path)
88 def commit_write(self, digest, write_session):
89 self.__logger.debug(f"Writing blob: [{digest}]")
90 object_path = self._get_object_path(digest)
92 try:
93 os.makedirs(os.path.dirname(object_path), exist_ok=True)
94 os.link(write_session.name, object_path)
95 except FileExistsError:
96 # Object is already there!
97 pass
98 except OSError as e:
99 # Not enough space error or file too large
100 if e.errno in [errno.ENOSPC, errno.EFBIG]:
101 raise StorageFullError(f"Disk Error: {e.errno}") from e
102 raise e
103 finally:
104 write_session.close()
106 def _get_object_path(self, digest):
107 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])