Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/sql/models.py: 99.28%

139 statements  

« prev     ^ index     » next       coverage.py v6.4.1, created at 2022-06-22 21:04 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14# 

15# pylint: disable=multiple-statements 

16 

17from typing import List 

18 

19from google.protobuf.duration_pb2 import Duration 

20from google.protobuf.timestamp_pb2 import Timestamp 

21from sqlalchemy import BigInteger, Boolean, Column, DateTime, ForeignKey, Index, Integer, LargeBinary, String, false 

22from sqlalchemy.ext.declarative import declarative_base 

23from sqlalchemy.orm import relationship 

24 

25from buildgrid.utils import BrowserURL 

26from ...._enums import LeaseState, OperationStage 

27from ...._exceptions import NotFoundError 

28from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Digest 

29from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata 

30from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse 

31from ...._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

32from ...._protos.google.rpc import code_pb2, status_pb2 

33from ...._protos.google.longrunning import operations_pb2 

34from ... import job 

35 

36 

37class Base: 

38 

39 """Base class which implements functionality relevant to all models.""" 

40 

41 def update(self, changes): 

42 for key, val in changes.items(): 

43 setattr(self, key, val) 

44 

45 

46Base = declarative_base(cls=Base) # type: ignore 

47 

48 

49class Job(Base): 

50 __tablename__ = 'jobs' 

51 

52 name = Column(String, primary_key=True) 

53 action_digest = Column(String, index=True, nullable=False) 

54 action = Column(LargeBinary) 

55 priority = Column(Integer, default=1, index=True, nullable=False) 

56 stage = Column(Integer, default=0, index=True, nullable=False) 

57 do_not_cache = Column(Boolean, default=False, nullable=False) 

58 cancelled = Column(Boolean, default=False, nullable=False) 

59 queued_timestamp = Column(DateTime, index=True) 

60 queued_time_duration = Column(Integer) 

61 worker_start_timestamp = Column(DateTime) 

62 worker_completed_timestamp = Column(DateTime) 

63 result = Column(String) 

64 assigned = Column(Boolean, default=False) 

65 n_tries = Column(Integer, default=0) 

66 platform_requirements = Column(String, nullable=True) 

67 status_code = Column(Integer, nullable=True) 

68 stdout_stream_name = Column(String, nullable=True) 

69 stdout_stream_write_name = Column(String, nullable=True) 

70 stderr_stream_name = Column(String, nullable=True) 

71 stderr_stream_write_name = Column(String, nullable=True) 

72 

73 leases: List = relationship('Lease', backref='job') 

74 active_states: List[int] = [ 

75 LeaseState.UNSPECIFIED.value, 

76 LeaseState.PENDING.value, 

77 LeaseState.ACTIVE.value 

78 ] 

79 active_leases: List = relationship( 

80 'Lease', 

81 primaryjoin=f'and_(Lease.job_name==Job.name, Lease.state.in_({active_states}))' 

82 ) 

83 

84 operations: List = relationship('Operation', backref='job') 

85 

86 __table_args__ = ( 

87 Index('ix_worker_start_timestamp', 

88 worker_start_timestamp, unique=False, postgresql_where=(worker_start_timestamp.isnot(None))), 

89 Index('ix_worker_start_timestamp', 

90 worker_start_timestamp, unique=False, sqlite_where=(worker_start_timestamp.isnot(None))), 

91 Index('ix_worker_completed_timestamp', 

92 worker_completed_timestamp, unique=False, postgresql_where=(worker_completed_timestamp.isnot(None))), 

93 Index('ix_worker_completed_timestamp', 

94 worker_completed_timestamp, unique=False, sqlite_where=(worker_completed_timestamp.isnot(None))) 

95 ) 

96 

97 def get_execute_response_protobuf_from_result_digest(self, data_store): 

98 if self.result is None: 

99 return None 

100 

101 # Check if this was stored in the response_cache; if not, load and cache it 

102 if self.name not in data_store.response_cache: 

103 result_digest = string_to_digest(self.result) 

104 result_proto = data_store.storage.get_message(result_digest, ExecuteResponse) 

105 if result_proto is None: 

106 raise NotFoundError(f"The result for job name=[{self.name}] with " 

107 f"result_digest=[{result_digest}] does not exist in the storage.") 

108 data_store.response_cache[self.name] = result_proto 

109 

110 # Return ExecuteResponse proto from response_cache 

111 return data_store.response_cache[self.name] 

112 

113 def to_internal_job(self, data_store, no_result=False, action_browser_url=None, instance_name=None): 

114 # There should never be more than one active lease for a job. If we 

115 # have more than one for some reason, just take the first one. 

116 # TODO(SotK): Log some information here if there are multiple active 

117 # (ie. not completed or cancelled) leases. 

118 lease = self.active_leases[0].to_protobuf() if self.active_leases else None 

119 q_timestamp = Timestamp() 

120 if self.queued_timestamp: 

121 q_timestamp.FromDatetime(self.queued_timestamp) 

122 q_time_duration = Duration() 

123 if self.queued_time_duration: 

124 q_time_duration.FromSeconds(self.queued_time_duration) 

125 ws_timestamp = Timestamp() 

126 if self.worker_start_timestamp: 

127 ws_timestamp.FromDatetime(self.worker_start_timestamp) 

128 wc_timestamp = Timestamp() 

129 if self.worker_completed_timestamp: 

130 wc_timestamp.FromDatetime(self.worker_completed_timestamp) 

131 

132 action_proto = None 

133 if self.action is not None: 

134 action_proto = Action() 

135 action_proto.ParseFromString(self.action) 

136 

137 result_proto = None 

138 if not no_result: 

139 result_proto = self.get_execute_response_protobuf_from_result_digest(data_store) 

140 

141 internal_job = job.Job( 

142 do_not_cache=self.do_not_cache, 

143 action=action_proto, 

144 action_digest=string_to_digest(self.action_digest), 

145 platform_requirements=self.platform_requirements, 

146 priority=self.priority, 

147 name=self.name, 

148 operation_names=set(op.name for op in self.operations), 

149 cancelled_operation_names=set(op.name for op in self.operations if op.cancelled), 

150 lease=lease, 

151 stage=OperationStage(self.stage), 

152 cancelled=self.cancelled, 

153 queued_timestamp=q_timestamp, 

154 queued_time_duration=q_time_duration, 

155 worker_start_timestamp=ws_timestamp, 

156 worker_completed_timestamp=wc_timestamp, 

157 result=result_proto, 

158 worker_name=self.active_leases[0].worker_name if self.active_leases else None, 

159 n_tries=self.n_tries, 

160 status_code=self.status_code, 

161 stdout_stream_name=self.stdout_stream_name, 

162 stdout_stream_write_name=self.stdout_stream_write_name, 

163 stderr_stream_name=self.stderr_stream_name, 

164 stderr_stream_write_name=self.stderr_stream_write_name 

165 ) 

166 

167 # Set action browser url if url and instance names are passed in 

168 if action_browser_url is not None: 

169 internal_job.set_action_url(BrowserURL(action_browser_url, instance_name)) 

170 

171 return internal_job 

172 

173 

174class Lease(Base): 

175 __tablename__ = 'leases' 

176 

177 id = Column(Integer, primary_key=True) 

178 job_name = Column(String, ForeignKey('jobs.name', ondelete='CASCADE', onupdate='CASCADE'), 

179 index=True, nullable=False) 

180 status = Column(Integer) 

181 state = Column(Integer, nullable=False) 

182 worker_name = Column(String, index=True, default=None) 

183 

184 def to_protobuf(self): 

185 lease = bots_pb2.Lease() 

186 lease.id = self.job_name 

187 

188 if self.job.action is not None: 

189 action = Action() 

190 action.ParseFromString(self.job.action) 

191 lease.payload.Pack(action) 

192 else: 

193 lease.payload.Pack(string_to_digest(self.job.action_digest)) 

194 

195 lease.state = self.state 

196 if self.status is not None: 

197 lease.status.code = self.status 

198 return lease 

199 

200 

201class Operation(Base): 

202 __tablename__ = 'operations' 

203 

204 name = Column(String, primary_key=True) 

205 job_name = Column(String, ForeignKey('jobs.name', ondelete='CASCADE', onupdate='CASCADE'), 

206 index=True, nullable=False) 

207 cancelled = Column(Boolean, default=False, nullable=False) 

208 tool_name = Column(String, nullable=True) 

209 tool_version = Column(String, nullable=True) 

210 invocation_id = Column(String, nullable=True) 

211 correlated_invocations_id = Column(String, nullable=True) 

212 

213 def to_protobuf(self, data_store, no_result=False): 

214 """Returns the protobuf representation of the operation. 

215 

216 When expecting a result, if the `ActionResult` message is 

217 missing from `data_store`, populates the `error` field of the 

218 result with `code_pb2.DATA_LOSS`. 

219 """ 

220 operation = operations_pb2.Operation() 

221 operation.name = self.name 

222 operation.done = self.job.stage == OperationStage.COMPLETED.value or self.cancelled 

223 operation.metadata.Pack(ExecuteOperationMetadata( 

224 stage=self.job.stage, 

225 action_digest=string_to_digest(self.job.action_digest))) 

226 

227 if self.cancelled: 

228 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED)) 

229 elif self.job.status_code != code_pb2.OK: 

230 operation.error.CopyFrom(status_pb2.Status(code=self.job.status_code)) 

231 

232 if self.job.result and not no_result: 

233 try: 

234 execute_response = self.job.get_execute_response_protobuf_from_result_digest(data_store) 

235 operation.response.Pack(execute_response) 

236 except NotFoundError: 

237 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.DATA_LOSS)) 

238 

239 return operation 

240 

241 

242class IndexEntry(Base): 

243 __tablename__ = 'index' 

244 

245 digest_hash = Column(String, nullable=False, index=True, primary_key=True) 

246 digest_size_bytes = Column(BigInteger, nullable=False) 

247 accessed_timestamp = Column(DateTime, index=True, nullable=False) 

248 deleted = Column(Boolean, nullable=False, server_default=false()) 

249 inline_blob = Column(LargeBinary, nullable=True) 

250 

251 

252def digest_to_string(digest): 

253 return f'{digest.hash}/{digest.size_bytes}' 

254 

255 

256def string_to_digest(string): 

257 digest_hash, size_bytes = string.split('/', 1) 

258 return Digest(hash=digest_hash, size_bytes=int(size_bytes))