Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15# pylint: disable=multiple-statements 

16 

17from datetime import datetime 

18from typing import List 

19 

20from google.protobuf.duration_pb2 import Duration 

21from google.protobuf.timestamp_pb2 import Timestamp 

22from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer, String, false 

23from sqlalchemy.ext.declarative import declarative_base 

24from sqlalchemy.orm import relationship 

25 

26from ...._enums import LeaseState, OperationStage 

27from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Digest 

28from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata 

29from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse 

30from ...._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

31from ...._protos.google.rpc import code_pb2, status_pb2 

32from ...._protos.google.longrunning import operations_pb2 

33from ... import job 

34 

35 

36class Base: 

37 

38 """Base class which implements functionality relevant to all models.""" 

39 

40 def update(self, changes): 

41 for key, val in changes.items(): 

42 setattr(self, key, val) 

43 

44 

45Base = declarative_base(cls=Base) # type: ignore 

46 

47 

48class Job(Base): 

49 __tablename__ = 'jobs' 

50 

51 name = Column(String, primary_key=True) 

52 action_digest = Column(String, index=True, nullable=False) 

53 priority = Column(Integer, default=1, index=True, nullable=False) 

54 stage = Column(Integer, default=0, index=True, nullable=False) 

55 do_not_cache = Column(Boolean, default=False, nullable=False) 

56 cancelled = Column(Boolean, default=False, nullable=False) 

57 queued_timestamp = Column(DateTime, index=True) 

58 queued_time_duration = Column(Integer) 

59 worker_start_timestamp = Column(DateTime) 

60 worker_completed_timestamp = Column(DateTime) 

61 result = Column(String) 

62 assigned = Column(Boolean, default=False) 

63 n_tries = Column(Integer, default=0) 

64 platform_requirements = Column(String, nullable=True) 

65 status_code = Column(Integer, nullable=True) 

66 

67 leases = relationship('Lease', backref='job') 

68 active_states: List[int] = [ 

69 LeaseState.UNSPECIFIED.value, 

70 LeaseState.PENDING.value, 

71 LeaseState.ACTIVE.value 

72 ] 

73 active_leases = relationship( 

74 'Lease', 

75 primaryjoin=f'and_(Lease.job_name==Job.name, Lease.state.in_({active_states}))' 

76 ) 

77 

78 operations = relationship('Operation', backref='job') 

79 

80 def to_internal_job(self, data_store, no_result=False): 

81 # There should never be more than one active lease for a job. If we 

82 # have more than one for some reason, just take the first one. 

83 # TODO(SotK): Log some information here if there are multiple active 

84 # (ie. not completed or cancelled) leases. 

85 lease = self.active_leases[0].to_protobuf() if self.active_leases else None 

86 q_timestamp = Timestamp() 

87 if self.queued_timestamp: 

88 q_timestamp.FromDatetime(self.queued_timestamp) 

89 q_time_duration = Duration() 

90 if self.queued_time_duration: 

91 q_time_duration.FromSeconds(self.queued_time_duration) 

92 ws_timestamp = Timestamp() 

93 if self.worker_start_timestamp: 

94 ws_timestamp.FromDatetime(self.worker_start_timestamp) 

95 wc_timestamp = Timestamp() 

96 if self.worker_completed_timestamp: 

97 wc_timestamp.FromDatetime(self.worker_completed_timestamp) 

98 

99 if self.name in data_store.response_cache and not no_result: 

100 result = data_store.response_cache[self.name] 

101 elif self.result is not None and not no_result: 

102 result_digest = string_to_digest(self.result) 

103 result = data_store.storage.get_message(result_digest, ExecuteResponse) 

104 data_store.response_cache[self.name] = result 

105 else: 

106 result = None 

107 

108 return job.Job( 

109 self.do_not_cache, 

110 string_to_digest(self.action_digest), 

111 platform_requirements=self.platform_requirements, 

112 priority=self.priority, 

113 name=self.name, 

114 operations=[op.to_protobuf() for op in self.operations], 

115 cancelled_operations=set(op.name for op in self.operations if op.cancelled), 

116 lease=lease, 

117 stage=self.stage, 

118 cancelled=self.cancelled, 

119 queued_timestamp=q_timestamp, 

120 queued_time_duration=q_time_duration, 

121 worker_start_timestamp=ws_timestamp, 

122 worker_completed_timestamp=wc_timestamp, 

123 result=result, 

124 worker_name=self.active_leases[0].worker_name if self.active_leases else None, 

125 n_tries=self.n_tries, 

126 status_code=self.status_code 

127 ) 

128 

129 

130class Lease(Base): 

131 __tablename__ = 'leases' 

132 

133 id = Column(Integer, primary_key=True) 

134 job_name = Column(String, ForeignKey('jobs.name', ondelete='CASCADE', onupdate='CASCADE'), 

135 index=True, nullable=False) 

136 status = Column(Integer) 

137 state = Column(Integer, nullable=False) 

138 worker_name = Column(String, index=True, default=None) 

139 

140 def to_protobuf(self): 

141 lease = bots_pb2.Lease() 

142 lease.id = self.job_name 

143 lease.payload.Pack(string_to_digest(self.job.action_digest)) 

144 lease.state = self.state 

145 if self.status is not None: 

146 lease.status.code = self.status 

147 return lease 

148 

149 

150class Operation(Base): 

151 __tablename__ = 'operations' 

152 

153 name = Column(String, primary_key=True) 

154 job_name = Column(String, ForeignKey('jobs.name', ondelete='CASCADE', onupdate='CASCADE'), 

155 index=True, nullable=False) 

156 cancelled = Column(Boolean, default=False, nullable=False) 

157 

158 def to_protobuf(self): 

159 operation = operations_pb2.Operation() 

160 operation.name = self.name 

161 operation.done = self.job.stage == OperationStage.COMPLETED.value or self.cancelled 

162 operation.metadata.Pack(ExecuteOperationMetadata( 

163 stage=self.job.stage, 

164 action_digest=string_to_digest(self.job.action_digest))) 

165 if self.cancelled: 

166 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED)) 

167 elif self.job.status_code != code_pb2.OK: 

168 operation.error.CopyFrom(status_pb2.Status(code=self.job.status_code)) 

169 

170 return operation 

171 

172 

173class IndexEntry(Base): 

174 __tablename__ = 'index' 

175 

176 digest_hash = Column(String, nullable=False, index=True, primary_key=True) 

177 digest_size_bytes = Column(Integer, nullable=False) 

178 accessed_timestamp = Column(DateTime, index=True, nullable=False) 

179 deleted = Column(Boolean, nullable=False, server_default=false()) 

180 

181 

182def digest_to_string(digest): 

183 return f'{digest.hash}/{digest.size_bytes}' 

184 

185 

186def string_to_digest(string): 

187 digest_hash, size_bytes = string.split('/', 1) 

188 return Digest(hash=digest_hash, size_bytes=int(size_bytes))