Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 98.41%

63 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-05 15:37 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17DiskStorage 

18================== 

19 

20A CAS storage provider that stores files as blobs on disk. 

21""" 

22 

23import errno 

24import io 

25import logging 

26import os 

27import tempfile 

28from typing import IO, Dict, List, Optional 

29 

30from buildgrid._exceptions import StorageFullError 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

32 

33from .storage_abc import StorageABC 

34 

35LOGGER = logging.getLogger(__name__) 

36 

37 

38class DiskStorage(StorageABC): 

39 def __init__(self, path: str) -> None: 

40 if not os.path.isabs(path): 

41 self.__root_path = os.path.abspath(path) 

42 else: 

43 self.__root_path = path 

44 self.__cas_path = os.path.join(self.__root_path, "cas") 

45 

46 self.objects_path = os.path.join(self.__cas_path, "objects") 

47 self.temp_path = os.path.join(self.__root_path, "tmp") 

48 

49 os.makedirs(self.objects_path, exist_ok=True) 

50 os.makedirs(self.temp_path, exist_ok=True) 

51 

52 def has_blob(self, digest: Digest) -> bool: 

53 LOGGER.debug(f"Checking for blob: [{digest}]") 

54 return os.path.exists(self._get_object_path(digest)) 

55 

56 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

57 LOGGER.debug(f"Getting blob: [{digest}]") 

58 try: 

59 f = open(self._get_object_path(digest), "rb") 

60 # TODO probably need to make StorageABC generic...? 

61 return io.BufferedReader(f) # type: ignore[arg-type] 

62 except FileNotFoundError: 

63 return None 

64 

65 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, IO[bytes]]: 

66 LOGGER.debug(f"Getting {len(digests)} blobs") 

67 blobmap = {} 

68 for digest in digests: 

69 blob = self.get_blob(digest) 

70 if blob is not None: 

71 # Bulk read is for a potentially very large number of 

72 # small blobs. The total size of all blobs is limited by 

73 # the gRPC message size. Immediately read each blob to 

74 # avoid hitting open file limits. 

75 blobmap[digest.hash] = io.BytesIO(blob.read()) 

76 blob.close() 

77 # TODO probably need to make StorageABC generic...? 

78 return blobmap # type: ignore[return-value] 

79 

80 def delete_blob(self, digest: Digest) -> None: 

81 LOGGER.debug(f"Deleting blob: [{digest}]") 

82 try: 

83 os.remove(self._get_object_path(digest)) 

84 except OSError: 

85 pass 

86 

87 def begin_write(self, digest: Digest) -> IO[bytes]: 

88 return tempfile.NamedTemporaryFile("wb", dir=self.temp_path) 

89 

90 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

91 LOGGER.debug(f"Writing blob: [{digest}]") 

92 object_path = self._get_object_path(digest) 

93 

94 try: 

95 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

96 os.link(write_session.name, object_path) 

97 except FileExistsError: 

98 # Object is already there! 

99 pass 

100 except OSError as e: 

101 # Not enough space error or file too large 

102 if e.errno in [errno.ENOSPC, errno.EFBIG]: 

103 raise StorageFullError(f"Disk Error: {e.errno}") from e 

104 raise e 

105 finally: 

106 write_session.close() 

107 

108 def _get_object_path(self, digest: Digest) -> str: 

109 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])