Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

119

120

121

122

123

124

125

126

127

128

129

130

131

132

# Copyright (C) 2018 Bloomberg LP 

# 

# Licensed under the Apache License, Version 2.0 (the "License"); 

# you may not use this file except in compliance with the License. 

# You may obtain a copy of the License at 

# 

# <http://www.apache.org/licenses/LICENSE-2.0> 

# 

# Unless required by applicable law or agreed to in writing, software 

# distributed under the License is distributed on an "AS IS" BASIS, 

# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

# See the License for the specific language governing permissions and 

# limitations under the License. 

 

 

""" 

StorageABC 

================== 

 

The abstract base class for storage providers. 

""" 

 

import abc 

 

from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

from buildgrid._protos.google.rpc.status_pb2 import Status 

from buildgrid._protos.google.rpc import code_pb2 

 

from ....settings import HASH 

 

 

class StorageABC(abc.ABC): 

 

@abc.abstractmethod 

def has_blob(self, digest): 

"""Return True if the blob with the given instance/digest exists.""" 

raise NotImplementedError() 

 

@abc.abstractmethod 

def get_blob(self, digest): 

"""Return a file-like object containing the blob. 

 

If the blob isn't present in storage, return None. 

""" 

raise NotImplementedError() 

 

@abc.abstractmethod 

def delete_blob(self, digest): 

"""Delete the blob from storage if it's present.""" 

 

@abc.abstractmethod 

def begin_write(self, digest): 

"""Return a file-like object to which a blob's contents could be 

written. 

""" 

raise NotImplementedError() 

 

@abc.abstractmethod 

def commit_write(self, digest, write_session): 

"""Commit the write operation. `write_session` must be an object 

returned by `begin_write`. 

 

The storage object is not responsible for verifying that the data 

written to the write_session actually matches the digest. The caller 

must do that. 

""" 

raise NotImplementedError() 

 

def missing_blobs(self, digests): 

"""Return a container containing the blobs not present in CAS.""" 

result = [] 

for digest in digests: 

if not self.has_blob(digest): 

result.append(digest) 

return result 

 

def bulk_update_blobs(self, blobs): 

"""Given a container of (digest, value) tuples, add all the blobs 

to CAS. Return a list of Status objects corresponding to the 

result of uploading each of the blobs. 

 

Unlike in `commit_write`, the storage object will verify that each of 

the digests matches the provided data. 

""" 

result = [] 

for digest, data in blobs: 

if len(data) != digest.size_bytes or HASH(data).hexdigest() != digest.hash: 

result.append( 

Status( 

code=code_pb2.INVALID_ARGUMENT, 

message="Data doesn't match hash", 

)) 

else: 

try: 

write_session = self.begin_write(digest) 

write_session.write(data) 

self.commit_write(digest, write_session) 

except IOError as ex: 

result.append( 

Status(code=code_pb2.UNKNOWN, message=str(ex))) 

else: 

result.append(Status(code=code_pb2.OK)) 

return result 

 

def bulk_read_blobs(self, digests): 

""" Given an iterable container of digests, return a 

{hash: file-like object} dictionary corresponding to the blobs 

represented by the input digests. 

""" 

 

return {digest.hash: self.get_blob(digest) for digest in digests} 

 

def put_message(self, message): 

"""Store the given Protobuf message in CAS, returning its digest.""" 

message_blob = message.SerializeToString() 

digest = Digest(hash=HASH(message_blob).hexdigest(), 

size_bytes=len(message_blob)) 

session = self.begin_write(digest) 

session.write(message_blob) 

self.commit_write(digest, session) 

return digest 

 

def get_message(self, digest, message_type): 

"""Retrieve the Protobuf message with the given digest and type from 

CAS. If the blob is not present, returns None. 

""" 

message_blob = self.get_blob(digest) 

if message_blob is None: 

return None 

result = message_type.FromString(message_blob.read()) 

message_blob.close() 

return result