Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2018 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14 

15 

16""" 

17DiskStorage 

18================== 

19 

20A CAS storage provider that stores files as blobs on disk. 

21""" 

22 

23import logging 

24import os 

25import tempfile 

26import io 

27 

28from .storage_abc import StorageABC 

29 

30 

31class DiskStorage(StorageABC): 

32 

33 def __init__(self, path): 

34 self.__logger = logging.getLogger(__name__) 

35 

36 if not os.path.isabs(path): 

37 self.__root_path = os.path.abspath(path) 

38 else: 

39 self.__root_path = path 

40 self.__cas_path = os.path.join(self.__root_path, 'cas') 

41 

42 self.objects_path = os.path.join(self.__cas_path, 'objects') 

43 self.temp_path = os.path.join(self.__root_path, 'tmp') 

44 

45 os.makedirs(self.objects_path, exist_ok=True) 

46 os.makedirs(self.temp_path, exist_ok=True) 

47 

48 def has_blob(self, digest): 

49 self.__logger.debug(f"Checking for blob: [{digest}]") 

50 return os.path.exists(self._get_object_path(digest)) 

51 

52 def get_blob(self, digest): 

53 self.__logger.debug(f"Getting blob: [{digest}]") 

54 try: 

55 f = open(self._get_object_path(digest), 'rb') 

56 return io.BufferedReader(f) 

57 except FileNotFoundError: 

58 return None 

59 

60 def delete_blob(self, digest): 

61 self.__logger.debug(f"Deleting blob: [{digest}]") 

62 try: 

63 os.remove(self._get_object_path(digest)) 

64 except OSError: 

65 pass 

66 

67 def begin_write(self, digest): 

68 return tempfile.NamedTemporaryFile("wb", dir=self.temp_path) 

69 

70 def commit_write(self, digest, write_session): 

71 self.__logger.debug(f"Writing blob: [{digest}]") 

72 object_path = self._get_object_path(digest) 

73 

74 try: 

75 os.makedirs(os.path.dirname(object_path), exist_ok=True) 

76 os.link(write_session.name, object_path) 

77 except FileExistsError: 

78 # Object is already there! 

79 pass 

80 

81 write_session.close() 

82 

83 def _get_object_path(self, digest): 

84 return os.path.join(self.objects_path, digest.hash[:2], digest.hash[2:])