Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 91.96%

112 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2025-05-28 16:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17DiskStorage 

18================== 

19 

20A CAS storage provider that stores files as blobs on disk. 

21""" 

22 

23import errno 

24import io 

25import os 

26import tempfile 

27from typing import IO 

28 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

30from buildgrid._protos.google.rpc import code_pb2 

31from buildgrid._protos.google.rpc.status_pb2 import Status 

32from buildgrid.server.decorators import timed 

33from buildgrid.server.exceptions import StorageFullError 

34from buildgrid.server.logging import buildgrid_logger 

35from buildgrid.server.metrics_names import METRIC 

36from buildgrid.server.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES 

37 

38from .storage_abc import StorageABC 

39 

40LOGGER = buildgrid_logger(__name__) 

41 

42 

43class DiskStorage(StorageABC): 

44 TYPE = "Disk" 

45 

46 def __init__(self, path: str) -> None: 

47 if not os.path.isabs(path): 

48 self.__root_path = os.path.abspath(path) 

49 else: 

50 self.__root_path = path 

51 self.__cas_path = os.path.join(self.__root_path, "cas") 

52 

53 self.objects_path = os.path.join(self.__cas_path, "objects") 

54 self.temp_path = os.path.join(self.__root_path, "tmp") 

55 

56 os.makedirs(self.objects_path, exist_ok=True) 

57 os.makedirs(self.temp_path, exist_ok=True) 

58 

59 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

60 def has_blob(self, digest: Digest) -> bool: 

61 LOGGER.debug("Checking for blob.", tags=dict(digest=digest)) 

62 return os.path.exists(self._get_object_path(digest)) 

63 

64 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

65 def missing_blobs(self, digests: list[Digest]) -> list[Digest]: 

66 return [digest for digest in digests if not os.path.exists(self._get_object_path(digest))] 

67 

68 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

69 def get_blob(self, digest: Digest) -> IO[bytes] | None: 

70 LOGGER.debug("Getting blob.", tags=dict(digest=digest)) 

71 try: 

72 f = open(self._get_object_path(digest), "rb") 

73 # TODO probably need to make StorageABC generic...? 

74 return io.BufferedReader(f) # type: ignore[arg-type] 

75 except FileNotFoundError: 

76 return None 

77 

78 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

79 def bulk_read_blobs(self, digests: list[Digest]) -> dict[str, bytes]: 

80 result = {} 

81 for digest in digests: 

82 try: 

83 with open(self._get_object_path(digest), "rb") as f: 

84 result[digest.hash] = f.read() 

85 except FileNotFoundError: 

86 # Ignore files not found, will be reported as NOT_FOUND higher up. 

87 pass 

88 return result 

89 

90 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

91 def delete_blob(self, digest: Digest) -> None: 

92 LOGGER.debug("Deleting blob.", tags=dict(digest=digest)) 

93 try: 

94 os.remove(self._get_object_path(digest)) 

95 except OSError: 

96 pass 

97 

98 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

99 def bulk_delete(self, digests: list[Digest]) -> list[str]: 

100 failed_deletions = [] 

101 for digest in digests: 

102 try: 

103 os.remove(self._get_object_path(digest)) 

104 except FileNotFoundError: 

105 # Ignore files not found. Already deleted. 

106 pass 

107 except OSError: 

108 # If deletion threw an exception, assume deletion failed. More specific implementations 

109 # with more information can return if a blob was missing instead 

110 LOGGER.warning("Unable to clean up digest.", tags=dict(digest=digest), exc_info=True) 

111 failed_deletions.append(digest.hash) 

112 

113 return failed_deletions 

114 

115 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

116 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

117 LOGGER.debug("Writing blob.", tags=dict(digest=digest)) 

118 object_path = self._get_object_path(digest) 

119 

120 write_session.seek(0) 

121 try: 

122 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f: 

123 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES): 

124 f.write(data) 

125 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

126 os.link(f.name, object_path) 

127 except FileExistsError: 

128 # Object is already there! 

129 pass 

130 except OSError as e: 

131 # Not enough space error or file too large 

132 if e.errno in [errno.ENOSPC, errno.EFBIG]: 

133 raise StorageFullError(f"Disk Error: {e.errno}") from e 

134 raise e 

135 

136 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

137 def bulk_update_blobs(self, blobs: list[tuple[Digest, bytes]]) -> list[Status]: 

138 result = [] 

139 for digest, data in blobs: 

140 object_path = self._get_object_path(digest) 

141 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

142 result.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")) 

143 continue 

144 try: 

145 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f: 

146 f.write(data) 

147 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

148 os.link(f.name, object_path) 

149 result.append(Status(code=code_pb2.OK)) 

150 except FileExistsError: 

151 # Object is already there! 

152 result.append(Status(code=code_pb2.OK)) 

153 except OSError as e: 

154 code = code_pb2.INTERNAL 

155 if e.errno in [errno.ENOSPC, errno.EFBIG]: 

156 code = code_pb2.RESOURCE_EXHAUSTED 

157 result.append(Status(code=code, message=f"Disk Error: {e}")) 

158 return result 

159 

160 def _get_object_path(self, digest: Digest) -> str: 

161 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])