Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/cas/storage/disk.py: 98.36%

61 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17DiskStorage 

18================== 

19 

20A CAS storage provider that stores files as blobs on disk. 

21""" 

22 

23import errno 

24import logging 

25import os 

26import tempfile 

27import io 

28 

29from buildgrid._exceptions import StorageFullError 

30from .storage_abc import StorageABC 

31 

32 

33class DiskStorage(StorageABC): 

34 

35 def __init__(self, path): 

36 self.__logger = logging.getLogger(__name__) 

37 

38 if not os.path.isabs(path): 

39 self.__root_path = os.path.abspath(path) 

40 else: 

41 self.__root_path = path 

42 self.__cas_path = os.path.join(self.__root_path, 'cas') 

43 

44 self.objects_path = os.path.join(self.__cas_path, 'objects') 

45 self.temp_path = os.path.join(self.__root_path, 'tmp') 

46 

47 os.makedirs(self.objects_path, exist_ok=True) 

48 os.makedirs(self.temp_path, exist_ok=True) 

49 

50 def has_blob(self, digest): 

51 self.__logger.debug(f"Checking for blob: [{digest}]") 

52 return os.path.exists(self._get_object_path(digest)) 

53 

54 def get_blob(self, digest): 

55 self.__logger.debug(f"Getting blob: [{digest}]") 

56 try: 

57 # pylint: disable=consider-using-with 

58 f = open(self._get_object_path(digest), 'rb') 

59 return io.BufferedReader(f) 

60 except FileNotFoundError: 

61 return None 

62 

63 def bulk_read_blobs(self, digests): 

64 self.__logger.debug(f"Getting {len(digests)} blobs") 

65 blobmap = {} 

66 for digest in digests: 

67 blob = self.get_blob(digest) 

68 if blob is not None: 

69 # Bulk read is for a potentially very large number of 

70 # small blobs. The total size of all blobs is limited by 

71 # the gRPC message size. Immediately read each blob to 

72 # avoid hitting open file limits. 

73 blobmap[digest.hash] = io.BytesIO(blob.read()) 

74 blob.close() 

75 return blobmap 

76 

77 def delete_blob(self, digest): 

78 self.__logger.debug(f"Deleting blob: [{digest}]") 

79 try: 

80 os.remove(self._get_object_path(digest)) 

81 except OSError: 

82 pass 

83 

84 def begin_write(self, digest): 

85 # pylint: disable=consider-using-with 

86 return tempfile.NamedTemporaryFile("wb", dir=self.temp_path) 

87 

88 def commit_write(self, digest, write_session): 

89 self.__logger.debug(f"Writing blob: [{digest}]") 

90 object_path = self._get_object_path(digest) 

91 

92 try: 

93 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

94 os.link(write_session.name, object_path) 

95 except FileExistsError: 

96 # Object is already there! 

97 pass 

98 except OSError as e: 

99 # Not enough space error or file too large 

100 if e.errno in [errno.ENOSPC, errno.EFBIG]: 

101 raise StorageFullError(f"Disk Error: {e.errno}") from e 

102 raise e 

103 finally: 

104 write_session.close() 

105 

106 def _get_object_path(self, digest): 

107 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])