Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/models.py: 97.71%
131 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-10-04 17:48 +0000
1# Copyright (C) 2019 Bloomberg LP
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# <http://www.apache.org/licenses/LICENSE-2.0>
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14import datetime
15from typing import List, Optional
17from sqlalchemy import (
18 BigInteger,
19 Boolean,
20 Column,
21 DateTime,
22 ForeignKey,
23 Index,
24 Integer,
25 LargeBinary,
26 String,
27 Table,
28 UniqueConstraint,
29 false,
30)
31from sqlalchemy.ext.declarative import declarative_base
32from sqlalchemy.orm import relationship
34from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Digest
35from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2
36from buildgrid.server.enums import LeaseState
38Base = declarative_base()
41job_platform_association = Table(
42 "job_platforms",
43 Base.metadata,
44 Column("job_name", ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), primary_key=True),
45 Column("platform_id", ForeignKey("platform_properties.id"), primary_key=True),
46)
49class PlatformEntry(Base):
50 __tablename__ = "platform_properties"
51 __table_args__ = (UniqueConstraint("key", "value"),)
53 id: int = Column(Integer, autoincrement=True, primary_key=True)
54 key: str = Column(String)
55 value: str = Column(String)
57 jobs: List["JobEntry"] = relationship("JobEntry", secondary=job_platform_association, back_populates="platform")
60class JobEntry(Base):
61 __tablename__ = "jobs"
63 # Immutable data
64 name: str = Column(String, primary_key=True)
65 instance_name: str = Column(String, index=True, nullable=False)
66 action_digest: str = Column(String, index=True, nullable=False)
67 action: bytes = Column(LargeBinary, nullable=False)
68 do_not_cache: bool = Column(Boolean, default=False, nullable=False)
69 # This is a hash of the platform properties, used for matching jobs to workers
70 platform_requirements: str = Column(String, nullable=False)
71 property_label: str = Column(String, nullable=False, server_default="unknown")
72 command: str = Column(String, nullable=False)
74 # Scheduling state
75 stage: int = Column(Integer, default=0, nullable=False)
76 priority: int = Column(Integer, default=1, index=True, nullable=False)
77 cancelled: bool = Column(Boolean, default=False, nullable=False)
78 assigned: bool = Column(Boolean, default=False, nullable=False)
79 n_tries: int = Column(Integer, default=0, nullable=False)
81 # Return data on completion
82 result: Optional[str] = Column(String, nullable=True)
83 status_code: Optional[int] = Column(Integer, nullable=True)
85 # Auditing data
86 create_timestamp: Optional[datetime.datetime] = Column(DateTime, nullable=True)
87 queued_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False)
88 queued_time_duration: Optional[int] = Column(Integer, nullable=True)
89 worker_start_timestamp: Optional[datetime.datetime] = Column(DateTime, nullable=True)
90 worker_completed_timestamp: Optional[datetime.datetime] = Column(DateTime, nullable=True)
92 # Logstream identifiers
93 stdout_stream_name: Optional[str] = Column(String, nullable=True)
94 stdout_stream_write_name: Optional[str] = Column(String, nullable=True)
95 stderr_stream_name: Optional[str] = Column(String, nullable=True)
96 stderr_stream_write_name: Optional[str] = Column(String, nullable=True)
98 leases: List["LeaseEntry"] = relationship("LeaseEntry", backref="job")
99 active_states: List[int] = [
100 LeaseState.UNSPECIFIED.value,
101 LeaseState.PENDING.value,
102 LeaseState.ACTIVE.value,
103 LeaseState.CANCELLED.value,
104 ]
105 active_leases: List["LeaseEntry"] = relationship(
106 "LeaseEntry",
107 primaryjoin=f"and_(LeaseEntry.job_name==JobEntry.name, LeaseEntry.state.in_({active_states}))",
108 order_by="LeaseEntry.id.desc()",
109 overlaps="job,leases",
110 )
112 operations: List["OperationEntry"] = relationship("OperationEntry", backref="job")
114 platform: List["PlatformEntry"] = relationship(
115 "PlatformEntry", secondary=job_platform_association, back_populates="jobs"
116 )
118 __table_args__ = (
119 Index(
120 "ix_worker_start_timestamp",
121 worker_start_timestamp,
122 unique=False,
123 postgresql_where=worker_start_timestamp.isnot(None),
124 sqlite_where=worker_start_timestamp.isnot(None),
125 ),
126 Index(
127 "ix_worker_completed_timestamp",
128 worker_completed_timestamp,
129 unique=False,
130 postgresql_where=worker_completed_timestamp.isnot(None),
131 sqlite_where=worker_completed_timestamp.isnot(None),
132 ),
133 Index(
134 "ix_jobs_stage_property_label",
135 stage,
136 property_label,
137 unique=False,
138 ),
139 )
142class LeaseEntry(Base):
143 __tablename__ = "leases"
145 job: JobEntry
146 id: int = Column(Integer, primary_key=True)
147 job_name: str = Column(
148 String, ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), index=True, nullable=False
149 )
150 status: Optional[int] = Column(Integer)
151 state: int = Column(Integer, nullable=False)
152 worker_name: Optional[str] = Column(String, index=True, default=None)
154 def to_protobuf(self) -> bots_pb2.Lease:
155 lease = bots_pb2.Lease()
156 lease.id = self.job_name
158 if self.job.action is not None:
159 action = Action()
160 action.ParseFromString(self.job.action)
161 lease.payload.Pack(action)
162 else:
163 lease.payload.Pack(string_to_digest(self.job.action_digest))
165 lease.state = self.state # type: ignore[assignment]
166 if self.status is not None:
167 lease.status.code = self.status
168 return lease
171class ClientIdentityEntry(Base):
172 __tablename__ = "client_identities"
173 __table_args__ = (UniqueConstraint("instance", "workflow", "actor", "subject"),)
175 id: int = Column(Integer, primary_key=True, autoincrement=True)
176 instance: str = Column(String, nullable=False)
177 workflow: str = Column(String, nullable=False)
178 actor: str = Column(String, nullable=False)
179 subject: str = Column(String, nullable=False)
181 def __str__(self) -> str:
182 return (
183 f"ClientIdentity: [instance={self.instance} workflow={self.workflow}"
184 f" actor={self.actor} subject={self.subject}]"
185 )
188class RequestMetadataEntry(Base):
189 __tablename__ = "request_metadata"
190 __table_args__ = (
191 UniqueConstraint(
192 "tool_name",
193 "tool_version",
194 "invocation_id",
195 "correlated_invocations_id",
196 "action_mnemonic",
197 "target_id",
198 "configuration_id",
199 name="unique_metadata_constraint",
200 ),
201 )
203 id: int = Column(Integer, primary_key=True, autoincrement=True)
204 tool_name: Optional[str] = Column(String, nullable=True)
205 tool_version: Optional[str] = Column(String, nullable=True)
206 invocation_id: Optional[str] = Column(String, nullable=True)
207 correlated_invocations_id: Optional[str] = Column(String, nullable=True)
208 action_mnemonic: Optional[str] = Column(String, nullable=True)
209 target_id: Optional[str] = Column(String, nullable=True)
210 configuration_id: Optional[str] = Column(String, nullable=True)
213class OperationEntry(Base):
214 __tablename__ = "operations"
216 job: JobEntry
217 name: str = Column(String, primary_key=True)
218 job_name: str = Column(
219 String, ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), index=True, nullable=False
220 )
221 cancelled: bool = Column(Boolean, default=False, nullable=False)
222 tool_name: Optional[str] = Column(String, nullable=True)
223 tool_version: Optional[str] = Column(String, nullable=True)
224 invocation_id: Optional[str] = Column(String, nullable=True)
225 correlated_invocations_id: Optional[str] = Column(String, nullable=True)
227 client_identity_id: Optional[int] = Column(Integer, ForeignKey("client_identities.id"), nullable=True)
228 client_identity: Optional[ClientIdentityEntry] = relationship("ClientIdentityEntry")
230 request_metadata_id: Optional[int] = Column(Integer, ForeignKey(RequestMetadataEntry.id), nullable=True)
231 request_metadata: Optional[RequestMetadataEntry] = relationship(RequestMetadataEntry)
234class IndexEntry(Base):
235 __tablename__ = "index"
237 digest_hash: str = Column(String, nullable=False, index=True, primary_key=True)
238 digest_size_bytes: int = Column(BigInteger, nullable=False)
239 accessed_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False)
240 deleted: bool = Column(Boolean, nullable=False, server_default=false())
241 inline_blob: Optional[bytes] = Column(LargeBinary, nullable=True)
244# This table is used to store the bot session state. It also stores the
245# assigned leases, instead of making use of the 'leases' table through an
246# SQLAlchemy relationship, as the 'leases' table is dependent on the type of
247# data store selected, and might never be populated.
248class BotEntry(Base):
249 __tablename__ = "bots"
251 # Immutable data
252 name: str = Column(String, nullable=False, index=True, primary_key=True)
253 bot_id: str = Column(String, nullable=False, index=True)
254 instance_name: str = Column(String, nullable=False)
256 # Scheduling state
257 bot_status: int = Column(Integer, nullable=False)
258 lease_id: Optional[str] = Column(String, nullable=True)
260 # Auditing data
261 expiry_time: datetime.datetime = Column(DateTime, index=True, nullable=False)
262 last_update_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False)
264 job: Optional[JobEntry] = relationship(JobEntry, primaryjoin="foreign(BotEntry.lease_id) == JobEntry.name")
267# This table is used by the SQLStorage CAS backend to store blobs
268# in a database.
269class BlobEntry(Base):
270 __tablename__ = "blobs"
272 digest_hash: str = Column(String, primary_key=True)
273 digest_size_bytes: int = Column(BigInteger, nullable=False)
274 data: bytes = Column(LargeBinary, nullable=False)
277def digest_to_string(digest: Digest) -> str:
278 return f"{digest.hash}/{digest.size_bytes}"
281def string_to_digest(string: str) -> Digest:
282 digest_hash, size_bytes = string.split("/", 1)
283 return Digest(hash=digest_hash, size_bytes=int(size_bytes))