Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/persistence/sql/models.py: 99.28%
139 statements
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
« prev ^ index » next coverage.py v6.4.1, created at 2022-06-22 21:04 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14#
15# pylint: disable=multiple-statements
17from typing import List
19from google.protobuf.duration_pb2 import Duration
20from google.protobuf.timestamp_pb2 import Timestamp
21from sqlalchemy import BigInteger, Boolean, Column, DateTime, ForeignKey, Index, Integer, LargeBinary, String, false
22from sqlalchemy.ext.declarative import declarative_base
23from sqlalchemy.orm import relationship
25from buildgrid.utils import BrowserURL
26from ...._enums import LeaseState, OperationStage
27from ...._exceptions import NotFoundError
28from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Digest
29from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteOperationMetadata
30from ...._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import ExecuteResponse
31from ...._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
32from ...._protos.google.rpc import code_pb2, status_pb2
33from ...._protos.google.longrunning import operations_pb2
34from ... import job
37class Base:
39 """Base class which implements functionality relevant to all models."""
41 def update(self, changes):
42 for key, val in changes.items():
43 setattr(self, key, val)
46Base = declarative_base(cls=Base) # type: ignore
49class Job(Base):
50 __tablename__ = 'jobs'
52 name = Column(String, primary_key=True)
53 action_digest = Column(String, index=True, nullable=False)
54 action = Column(LargeBinary)
55 priority = Column(Integer, default=1, index=True, nullable=False)
56 stage = Column(Integer, default=0, index=True, nullable=False)
57 do_not_cache = Column(Boolean, default=False, nullable=False)
58 cancelled = Column(Boolean, default=False, nullable=False)
59 queued_timestamp = Column(DateTime, index=True)
60 queued_time_duration = Column(Integer)
61 worker_start_timestamp = Column(DateTime)
62 worker_completed_timestamp = Column(DateTime)
63 result = Column(String)
64 assigned = Column(Boolean, default=False)
65 n_tries = Column(Integer, default=0)
66 platform_requirements = Column(String, nullable=True)
67 status_code = Column(Integer, nullable=True)
68 stdout_stream_name = Column(String, nullable=True)
69 stdout_stream_write_name = Column(String, nullable=True)
70 stderr_stream_name = Column(String, nullable=True)
71 stderr_stream_write_name = Column(String, nullable=True)
73 leases: List = relationship('Lease', backref='job')
74 active_states: List[int] = [
75 LeaseState.UNSPECIFIED.value,
76 LeaseState.PENDING.value,
77 LeaseState.ACTIVE.value
78 ]
79 active_leases: List = relationship(
80 'Lease',
81 primaryjoin=f'and_(Lease.job_name==Job.name, Lease.state.in_({active_states}))'
82 )
84 operations: List = relationship('Operation', backref='job')
86 __table_args__ = (
87 Index('ix_worker_start_timestamp',
88 worker_start_timestamp, unique=False, postgresql_where=(worker_start_timestamp.isnot(None))),
89 Index('ix_worker_start_timestamp',
90 worker_start_timestamp, unique=False, sqlite_where=(worker_start_timestamp.isnot(None))),
91 Index('ix_worker_completed_timestamp',
92 worker_completed_timestamp, unique=False, postgresql_where=(worker_completed_timestamp.isnot(None))),
93 Index('ix_worker_completed_timestamp',
94 worker_completed_timestamp, unique=False, sqlite_where=(worker_completed_timestamp.isnot(None)))
95 )
97 def get_execute_response_protobuf_from_result_digest(self, data_store):
98 if self.result is None:
99 return None
101 # Check if this was stored in the response_cache; if not, load and cache it
102 if self.name not in data_store.response_cache:
103 result_digest = string_to_digest(self.result)
104 result_proto = data_store.storage.get_message(result_digest, ExecuteResponse)
105 if result_proto is None:
106 raise NotFoundError(f"The result for job name=[{self.name}] with "
107 f"result_digest=[{result_digest}] does not exist in the storage.")
108 data_store.response_cache[self.name] = result_proto
110 # Return ExecuteResponse proto from response_cache
111 return data_store.response_cache[self.name]
113 def to_internal_job(self, data_store, no_result=False, action_browser_url=None, instance_name=None):
114 # There should never be more than one active lease for a job. If we
115 # have more than one for some reason, just take the first one.
116 # TODO(SotK): Log some information here if there are multiple active
117 # (ie. not completed or cancelled) leases.
118 lease = self.active_leases[0].to_protobuf() if self.active_leases else None
119 q_timestamp = Timestamp()
120 if self.queued_timestamp:
121 q_timestamp.FromDatetime(self.queued_timestamp)
122 q_time_duration = Duration()
123 if self.queued_time_duration:
124 q_time_duration.FromSeconds(self.queued_time_duration)
125 ws_timestamp = Timestamp()
126 if self.worker_start_timestamp:
127 ws_timestamp.FromDatetime(self.worker_start_timestamp)
128 wc_timestamp = Timestamp()
129 if self.worker_completed_timestamp:
130 wc_timestamp.FromDatetime(self.worker_completed_timestamp)
132 action_proto = None
133 if self.action is not None:
134 action_proto = Action()
135 action_proto.ParseFromString(self.action)
137 result_proto = None
138 if not no_result:
139 result_proto = self.get_execute_response_protobuf_from_result_digest(data_store)
141 internal_job = job.Job(
142 do_not_cache=self.do_not_cache,
143 action=action_proto,
144 action_digest=string_to_digest(self.action_digest),
145 platform_requirements=self.platform_requirements,
146 priority=self.priority,
147 name=self.name,
148 operation_names=set(op.name for op in self.operations),
149 cancelled_operation_names=set(op.name for op in self.operations if op.cancelled),
150 lease=lease,
151 stage=OperationStage(self.stage),
152 cancelled=self.cancelled,
153 queued_timestamp=q_timestamp,
154 queued_time_duration=q_time_duration,
155 worker_start_timestamp=ws_timestamp,
156 worker_completed_timestamp=wc_timestamp,
157 result=result_proto,
158 worker_name=self.active_leases[0].worker_name if self.active_leases else None,
159 n_tries=self.n_tries,
160 status_code=self.status_code,
161 stdout_stream_name=self.stdout_stream_name,
162 stdout_stream_write_name=self.stdout_stream_write_name,
163 stderr_stream_name=self.stderr_stream_name,
164 stderr_stream_write_name=self.stderr_stream_write_name
165 )
167 # Set action browser url if url and instance names are passed in
168 if action_browser_url is not None:
169 internal_job.set_action_url(BrowserURL(action_browser_url, instance_name))
171 return internal_job
174class Lease(Base):
175 __tablename__ = 'leases'
177 id = Column(Integer, primary_key=True)
178 job_name = Column(String, ForeignKey('jobs.name', ondelete='CASCADE', onupdate='CASCADE'),
179 index=True, nullable=False)
180 status = Column(Integer)
181 state = Column(Integer, nullable=False)
182 worker_name = Column(String, index=True, default=None)
184 def to_protobuf(self):
185 lease = bots_pb2.Lease()
186 lease.id = self.job_name
188 if self.job.action is not None:
189 action = Action()
190 action.ParseFromString(self.job.action)
191 lease.payload.Pack(action)
192 else:
193 lease.payload.Pack(string_to_digest(self.job.action_digest))
195 lease.state = self.state
196 if self.status is not None:
197 lease.status.code = self.status
198 return lease
201class Operation(Base):
202 __tablename__ = 'operations'
204 name = Column(String, primary_key=True)
205 job_name = Column(String, ForeignKey('jobs.name', ondelete='CASCADE', onupdate='CASCADE'),
206 index=True, nullable=False)
207 cancelled = Column(Boolean, default=False, nullable=False)
208 tool_name = Column(String, nullable=True)
209 tool_version = Column(String, nullable=True)
210 invocation_id = Column(String, nullable=True)
211 correlated_invocations_id = Column(String, nullable=True)
213 def to_protobuf(self, data_store, no_result=False):
214 """Returns the protobuf representation of the operation.
216 When expecting a result, if the `ActionResult` message is
217 missing from `data_store`, populates the `error` field of the
218 result with `code_pb2.DATA_LOSS`.
219 """
220 operation = operations_pb2.Operation()
221 operation.name = self.name
222 operation.done = self.job.stage == OperationStage.COMPLETED.value or self.cancelled
223 operation.metadata.Pack(ExecuteOperationMetadata(
224 stage=self.job.stage,
225 action_digest=string_to_digest(self.job.action_digest)))
227 if self.cancelled:
228 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.CANCELLED))
229 elif self.job.status_code != code_pb2.OK:
230 operation.error.CopyFrom(status_pb2.Status(code=self.job.status_code))
232 if self.job.result and not no_result:
233 try:
234 execute_response = self.job.get_execute_response_protobuf_from_result_digest(data_store)
235 operation.response.Pack(execute_response)
236 except NotFoundError:
237 operation.error.CopyFrom(status_pb2.Status(code=code_pb2.DATA_LOSS))
239 return operation
242class IndexEntry(Base):
243 __tablename__ = 'index'
245 digest_hash = Column(String, nullable=False, index=True, primary_key=True)
246 digest_size_bytes = Column(BigInteger, nullable=False)
247 accessed_timestamp = Column(DateTime, index=True, nullable=False)
248 deleted = Column(Boolean, nullable=False, server_default=false())
249 inline_blob = Column(LargeBinary, nullable=True)
252def digest_to_string(digest):
253 return f'{digest.hash}/{digest.size_bytes}'
256def string_to_digest(string):
257 digest_hash, size_bytes = string.split('/', 1)
258 return Digest(hash=digest_hash, size_bytes=int(size_bytes))