Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 91.96%

112 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17DiskStorage 

18================== 

19 

20A CAS storage provider that stores files as blobs on disk. 

21""" 

22 

23import errno 

24import io 

25import os 

26import tempfile 

27from typing import IO, Dict, List, Optional, Tuple 

28 

29from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

30from buildgrid._protos.google.rpc import code_pb2 

31from buildgrid._protos.google.rpc.status_pb2 import Status 

32from buildgrid.server.decorators import timed 

33from buildgrid.server.exceptions import StorageFullError 

34from buildgrid.server.logging import buildgrid_logger 

35from buildgrid.server.metrics_names import METRIC 

36from buildgrid.server.settings import HASH, MAX_IN_MEMORY_BLOB_SIZE_BYTES 

37 

38from .storage_abc import StorageABC 

39 

40LOGGER = buildgrid_logger(__name__) 

41 

42 

43class DiskStorage(StorageABC): 

44 TYPE = "Disk" 

45 

46 def __init__(self, path: str) -> None: 

47 if not os.path.isabs(path): 

48 self.__root_path = os.path.abspath(path) 

49 else: 

50 self.__root_path = path 

51 self.__cas_path = os.path.join(self.__root_path, "cas") 

52 

53 self.objects_path = os.path.join(self.__cas_path, "objects") 

54 self.temp_path = os.path.join(self.__root_path, "tmp") 

55 

56 os.makedirs(self.objects_path, exist_ok=True) 

57 os.makedirs(self.temp_path, exist_ok=True) 

58 

59 @timed(METRIC.STORAGE.STAT_DURATION, type=TYPE) 

60 def has_blob(self, digest: Digest) -> bool: 

61 LOGGER.debug("Checking for blob.", tags=dict(digest=digest)) 

62 return os.path.exists(self._get_object_path(digest)) 

63 

64 @timed(METRIC.STORAGE.BULK_STAT_DURATION, type=TYPE) 

65 def missing_blobs(self, digests: List[Digest]) -> List[Digest]: 

66 return [digest for digest in digests if not os.path.exists(self._get_object_path(digest))] 

67 

68 @timed(METRIC.STORAGE.READ_DURATION, type=TYPE) 

69 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

70 LOGGER.debug("Getting blob.", tags=dict(digest=digest)) 

71 try: 

72 f = open(self._get_object_path(digest), "rb") 

73 # TODO probably need to make StorageABC generic...? 

74 return io.BufferedReader(f) # type: ignore[arg-type] 

75 except FileNotFoundError: 

76 return None 

77 

78 @timed(METRIC.STORAGE.BULK_READ_DURATION, type=TYPE) 

79 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

80 result = {} 

81 for digest in digests: 

82 try: 

83 with open(self._get_object_path(digest), "rb") as f: 

84 result[digest.hash] = f.read() 

85 except FileNotFoundError: 

86 # Ignore files not found, will be reported as NOT_FOUND higher up. 

87 pass 

88 return result 

89 

90 @timed(METRIC.STORAGE.DELETE_DURATION, type=TYPE) 

91 def delete_blob(self, digest: Digest) -> None: 

92 LOGGER.debug("Deleting blob.", tags=dict(digest=digest)) 

93 try: 

94 os.remove(self._get_object_path(digest)) 

95 except OSError: 

96 pass 

97 

98 @timed(METRIC.STORAGE.BULK_DELETE_DURATION, type=TYPE) 

99 def bulk_delete(self, digests: List[Digest]) -> List[str]: 

100 failed_deletions = [] 

101 for digest in digests: 

102 try: 

103 os.remove(self._get_object_path(digest)) 

104 except FileNotFoundError: 

105 # Ignore files not found. Already deleted. 

106 pass 

107 except OSError: 

108 # If deletion threw an exception, assume deletion failed. More specific implementations 

109 # with more information can return if a blob was missing instead 

110 LOGGER.warning("Unable to clean up digest.", tags=dict(digest=digest), exc_info=True) 

111 failed_deletions.append(digest.hash) 

112 

113 return failed_deletions 

114 

115 @timed(METRIC.STORAGE.WRITE_DURATION, type=TYPE) 

116 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

117 LOGGER.debug("Writing blob.", tags=dict(digest=digest)) 

118 object_path = self._get_object_path(digest) 

119 

120 write_session.seek(0) 

121 try: 

122 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f: 

123 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES): 

124 f.write(data) 

125 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

126 os.link(f.name, object_path) 

127 except FileExistsError: 

128 # Object is already there! 

129 pass 

130 except OSError as e: 

131 # Not enough space error or file too large 

132 if e.errno in [errno.ENOSPC, errno.EFBIG]: 

133 raise StorageFullError(f"Disk Error: {e.errno}") from e 

134 raise e 

135 

136 @timed(METRIC.STORAGE.BULK_WRITE_DURATION, type=TYPE) 

137 def bulk_update_blobs(self, blobs: List[Tuple[Digest, bytes]]) -> List[Status]: 

138 result = [] 

139 for digest, data in blobs: 

140 object_path = self._get_object_path(digest) 

141 if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

142 result.append(Status(code=code_pb2.INVALID_ARGUMENT, message="Data doesn't match hash")) 

143 continue 

144 try: 

145 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f: 

146 f.write(data) 

147 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

148 os.link(f.name, object_path) 

149 result.append(Status(code=code_pb2.OK)) 

150 except FileExistsError: 

151 # Object is already there! 

152 result.append(Status(code=code_pb2.OK)) 

153 except OSError as e: 

154 code = code_pb2.INTERNAL 

155 if e.errno in [errno.ENOSPC, errno.EFBIG]: 

156 code = code_pb2.RESOURCE_EXHAUSTED 

157 result.append(Status(code=code, message=f"Disk Error: {e}")) 

158 return result 

159 

160 def _get_object_path(self, digest: Digest) -> str: 

161 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])