Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 98.46%

65 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-04-15 14:01 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17DiskStorage 

18================== 

19 

20A CAS storage provider that stores files as blobs on disk. 

21""" 

22 

23import errno 

24import io 

25import logging 

26import os 

27import tempfile 

28from typing import IO, Dict, List, Optional 

29 

30from buildgrid._exceptions import StorageFullError 

31from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

32from buildgrid.settings import MAX_IN_MEMORY_BLOB_SIZE_BYTES 

33 

34from .storage_abc import StorageABC 

35 

36LOGGER = logging.getLogger(__name__) 

37 

38 

39class DiskStorage(StorageABC): 

40 def __init__(self, path: str) -> None: 

41 if not os.path.isabs(path): 

42 self.__root_path = os.path.abspath(path) 

43 else: 

44 self.__root_path = path 

45 self.__cas_path = os.path.join(self.__root_path, "cas") 

46 

47 self.objects_path = os.path.join(self.__cas_path, "objects") 

48 self.temp_path = os.path.join(self.__root_path, "tmp") 

49 

50 os.makedirs(self.objects_path, exist_ok=True) 

51 os.makedirs(self.temp_path, exist_ok=True) 

52 

53 def has_blob(self, digest: Digest) -> bool: 

54 LOGGER.debug(f"Checking for blob: [{digest}]") 

55 return os.path.exists(self._get_object_path(digest)) 

56 

57 def get_blob(self, digest: Digest) -> Optional[IO[bytes]]: 

58 LOGGER.debug(f"Getting blob: [{digest}]") 

59 try: 

60 f = open(self._get_object_path(digest), "rb") 

61 # TODO probably need to make StorageABC generic...? 

62 return io.BufferedReader(f) # type: ignore[arg-type] 

63 except FileNotFoundError: 

64 return None 

65 

66 def bulk_read_blobs(self, digests: List[Digest]) -> Dict[str, bytes]: 

67 LOGGER.debug(f"Getting {len(digests)} blobs") 

68 blobmap: Dict[str, bytes] = {} 

69 for digest in digests: 

70 blob = self.get_blob(digest) 

71 if blob is not None: 

72 with blob: 

73 blobmap[digest.hash] = blob.read() 

74 return blobmap 

75 

76 def delete_blob(self, digest: Digest) -> None: 

77 LOGGER.debug(f"Deleting blob: [{digest}]") 

78 try: 

79 os.remove(self._get_object_path(digest)) 

80 except OSError: 

81 pass 

82 

83 def commit_write(self, digest: Digest, write_session: IO[bytes]) -> None: 

84 LOGGER.debug(f"Writing blob: [{digest}]") 

85 object_path = self._get_object_path(digest) 

86 

87 write_session.seek(0) 

88 try: 

89 with tempfile.NamedTemporaryFile("wb", dir=self.temp_path) as f: 

90 while data := write_session.read(MAX_IN_MEMORY_BLOB_SIZE_BYTES): 

91 f.write(data) 

92 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

93 os.link(f.name, object_path) 

94 except FileExistsError: 

95 # Object is already there! 

96 pass 

97 except OSError as e: 

98 # Not enough space error or file too large 

99 if e.errno in [errno.ENOSPC, errno.EFBIG]: 

100 raise StorageFullError(f"Disk Error: {e.errno}") from e 

101 raise e 

102 

103 def _get_object_path(self, digest: Digest) -> str: 

104 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])