Coverage for /builds/BuildGrid/buildgrid/buildgrid/server/sql/models.py: 97.71%

131 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-10-04 17:48 +0000

1# Copyright (C) 2019 Bloomberg LP 

2# 

3# Licensed under the Apache License, Version 2.0 (the "License"); 

4# you may not use this file except in compliance with the License. 

5# You may obtain a copy of the License at 

6# 

7# <http://www.apache.org/licenses/LICENSE-2.0> 

8# 

9# Unless required by applicable law or agreed to in writing, software 

10# distributed under the License is distributed on an "AS IS" BASIS, 

11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 

12# See the License for the specific language governing permissions and 

13# limitations under the License. 

14import datetime 

15from typing import List, Optional 

16 

17from sqlalchemy import ( 

18 BigInteger, 

19 Boolean, 

20 Column, 

21 DateTime, 

22 ForeignKey, 

23 Index, 

24 Integer, 

25 LargeBinary, 

26 String, 

27 Table, 

28 UniqueConstraint, 

29 false, 

30) 

31from sqlalchemy.ext.declarative import declarative_base 

32from sqlalchemy.orm import relationship 

33 

34from buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2 import Action, Digest 

35from buildgrid._protos.google.devtools.remoteworkers.v1test2 import bots_pb2 

36from buildgrid.server.enums import LeaseState 

37 

38Base = declarative_base() 

39 

40 

41job_platform_association = Table( 

42 "job_platforms", 

43 Base.metadata, 

44 Column("job_name", ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), primary_key=True), 

45 Column("platform_id", ForeignKey("platform_properties.id"), primary_key=True), 

46) 

47 

48 

49class PlatformEntry(Base): 

50 __tablename__ = "platform_properties" 

51 __table_args__ = (UniqueConstraint("key", "value"),) 

52 

53 id: int = Column(Integer, autoincrement=True, primary_key=True) 

54 key: str = Column(String) 

55 value: str = Column(String) 

56 

57 jobs: List["JobEntry"] = relationship("JobEntry", secondary=job_platform_association, back_populates="platform") 

58 

59 

60class JobEntry(Base): 

61 __tablename__ = "jobs" 

62 

63 # Immutable data 

64 name: str = Column(String, primary_key=True) 

65 instance_name: str = Column(String, index=True, nullable=False) 

66 action_digest: str = Column(String, index=True, nullable=False) 

67 action: bytes = Column(LargeBinary, nullable=False) 

68 do_not_cache: bool = Column(Boolean, default=False, nullable=False) 

69 # This is a hash of the platform properties, used for matching jobs to workers 

70 platform_requirements: str = Column(String, nullable=False) 

71 property_label: str = Column(String, nullable=False, server_default="unknown") 

72 command: str = Column(String, nullable=False) 

73 

74 # Scheduling state 

75 stage: int = Column(Integer, default=0, nullable=False) 

76 priority: int = Column(Integer, default=1, index=True, nullable=False) 

77 cancelled: bool = Column(Boolean, default=False, nullable=False) 

78 assigned: bool = Column(Boolean, default=False, nullable=False) 

79 n_tries: int = Column(Integer, default=0, nullable=False) 

80 

81 # Return data on completion 

82 result: Optional[str] = Column(String, nullable=True) 

83 status_code: Optional[int] = Column(Integer, nullable=True) 

84 

85 # Auditing data 

86 create_timestamp: Optional[datetime.datetime] = Column(DateTime, nullable=True) 

87 queued_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False) 

88 queued_time_duration: Optional[int] = Column(Integer, nullable=True) 

89 worker_start_timestamp: Optional[datetime.datetime] = Column(DateTime, nullable=True) 

90 worker_completed_timestamp: Optional[datetime.datetime] = Column(DateTime, nullable=True) 

91 

92 # Logstream identifiers 

93 stdout_stream_name: Optional[str] = Column(String, nullable=True) 

94 stdout_stream_write_name: Optional[str] = Column(String, nullable=True) 

95 stderr_stream_name: Optional[str] = Column(String, nullable=True) 

96 stderr_stream_write_name: Optional[str] = Column(String, nullable=True) 

97 

98 leases: List["LeaseEntry"] = relationship("LeaseEntry", backref="job") 

99 active_states: List[int] = [ 

100 LeaseState.UNSPECIFIED.value, 

101 LeaseState.PENDING.value, 

102 LeaseState.ACTIVE.value, 

103 LeaseState.CANCELLED.value, 

104 ] 

105 active_leases: List["LeaseEntry"] = relationship( 

106 "LeaseEntry", 

107 primaryjoin=f"and_(LeaseEntry.job_name==JobEntry.name, LeaseEntry.state.in_({active_states}))", 

108 order_by="LeaseEntry.id.desc()", 

109 overlaps="job,leases", 

110 ) 

111 

112 operations: List["OperationEntry"] = relationship("OperationEntry", backref="job") 

113 

114 platform: List["PlatformEntry"] = relationship( 

115 "PlatformEntry", secondary=job_platform_association, back_populates="jobs" 

116 ) 

117 

118 __table_args__ = ( 

119 Index( 

120 "ix_worker_start_timestamp", 

121 worker_start_timestamp, 

122 unique=False, 

123 postgresql_where=worker_start_timestamp.isnot(None), 

124 sqlite_where=worker_start_timestamp.isnot(None), 

125 ), 

126 Index( 

127 "ix_worker_completed_timestamp", 

128 worker_completed_timestamp, 

129 unique=False, 

130 postgresql_where=worker_completed_timestamp.isnot(None), 

131 sqlite_where=worker_completed_timestamp.isnot(None), 

132 ), 

133 Index( 

134 "ix_jobs_stage_property_label", 

135 stage, 

136 property_label, 

137 unique=False, 

138 ), 

139 ) 

140 

141 

142class LeaseEntry(Base): 

143 __tablename__ = "leases" 

144 

145 job: JobEntry 

146 id: int = Column(Integer, primary_key=True) 

147 job_name: str = Column( 

148 String, ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), index=True, nullable=False 

149 ) 

150 status: Optional[int] = Column(Integer) 

151 state: int = Column(Integer, nullable=False) 

152 worker_name: Optional[str] = Column(String, index=True, default=None) 

153 

154 def to_protobuf(self) -> bots_pb2.Lease: 

155 lease = bots_pb2.Lease() 

156 lease.id = self.job_name 

157 

158 if self.job.action is not None: 

159 action = Action() 

160 action.ParseFromString(self.job.action) 

161 lease.payload.Pack(action) 

162 else: 

163 lease.payload.Pack(string_to_digest(self.job.action_digest)) 

164 

165 lease.state = self.state # type: ignore[assignment] 

166 if self.status is not None: 

167 lease.status.code = self.status 

168 return lease 

169 

170 

171class ClientIdentityEntry(Base): 

172 __tablename__ = "client_identities" 

173 __table_args__ = (UniqueConstraint("instance", "workflow", "actor", "subject"),) 

174 

175 id: int = Column(Integer, primary_key=True, autoincrement=True) 

176 instance: str = Column(String, nullable=False) 

177 workflow: str = Column(String, nullable=False) 

178 actor: str = Column(String, nullable=False) 

179 subject: str = Column(String, nullable=False) 

180 

181 def __str__(self) -> str: 

182 return ( 

183 f"ClientIdentity: [instance={self.instance} workflow={self.workflow}" 

184 f" actor={self.actor} subject={self.subject}]" 

185 ) 

186 

187 

188class RequestMetadataEntry(Base): 

189 __tablename__ = "request_metadata" 

190 __table_args__ = ( 

191 UniqueConstraint( 

192 "tool_name", 

193 "tool_version", 

194 "invocation_id", 

195 "correlated_invocations_id", 

196 "action_mnemonic", 

197 "target_id", 

198 "configuration_id", 

199 name="unique_metadata_constraint", 

200 ), 

201 ) 

202 

203 id: int = Column(Integer, primary_key=True, autoincrement=True) 

204 tool_name: Optional[str] = Column(String, nullable=True) 

205 tool_version: Optional[str] = Column(String, nullable=True) 

206 invocation_id: Optional[str] = Column(String, nullable=True) 

207 correlated_invocations_id: Optional[str] = Column(String, nullable=True) 

208 action_mnemonic: Optional[str] = Column(String, nullable=True) 

209 target_id: Optional[str] = Column(String, nullable=True) 

210 configuration_id: Optional[str] = Column(String, nullable=True) 

211 

212 

213class OperationEntry(Base): 

214 __tablename__ = "operations" 

215 

216 job: JobEntry 

217 name: str = Column(String, primary_key=True) 

218 job_name: str = Column( 

219 String, ForeignKey("jobs.name", ondelete="CASCADE", onupdate="CASCADE"), index=True, nullable=False 

220 ) 

221 cancelled: bool = Column(Boolean, default=False, nullable=False) 

222 tool_name: Optional[str] = Column(String, nullable=True) 

223 tool_version: Optional[str] = Column(String, nullable=True) 

224 invocation_id: Optional[str] = Column(String, nullable=True) 

225 correlated_invocations_id: Optional[str] = Column(String, nullable=True) 

226 

227 client_identity_id: Optional[int] = Column(Integer, ForeignKey("client_identities.id"), nullable=True) 

228 client_identity: Optional[ClientIdentityEntry] = relationship("ClientIdentityEntry") 

229 

230 request_metadata_id: Optional[int] = Column(Integer, ForeignKey(RequestMetadataEntry.id), nullable=True) 

231 request_metadata: Optional[RequestMetadataEntry] = relationship(RequestMetadataEntry) 

232 

233 

234class IndexEntry(Base): 

235 __tablename__ = "index" 

236 

237 digest_hash: str = Column(String, nullable=False, index=True, primary_key=True) 

238 digest_size_bytes: int = Column(BigInteger, nullable=False) 

239 accessed_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False) 

240 deleted: bool = Column(Boolean, nullable=False, server_default=false()) 

241 inline_blob: Optional[bytes] = Column(LargeBinary, nullable=True) 

242 

243 

244# This table is used to store the bot session state. It also stores the 

245# assigned leases, instead of making use of the 'leases' table through an 

246# SQLAlchemy relationship, as the 'leases' table is dependent on the type of 

247# data store selected, and might never be populated. 

248class BotEntry(Base): 

249 __tablename__ = "bots" 

250 

251 # Immutable data 

252 name: str = Column(String, nullable=False, index=True, primary_key=True) 

253 bot_id: str = Column(String, nullable=False, index=True) 

254 instance_name: str = Column(String, nullable=False) 

255 

256 # Scheduling state 

257 bot_status: int = Column(Integer, nullable=False) 

258 lease_id: Optional[str] = Column(String, nullable=True) 

259 

260 # Auditing data 

261 expiry_time: datetime.datetime = Column(DateTime, index=True, nullable=False) 

262 last_update_timestamp: datetime.datetime = Column(DateTime, index=True, nullable=False) 

263 

264 job: Optional[JobEntry] = relationship(JobEntry, primaryjoin="foreign(BotEntry.lease_id) == JobEntry.name") 

265 

266 

267# This table is used by the SQLStorage CAS backend to store blobs 

268# in a database. 

269class BlobEntry(Base): 

270 __tablename__ = "blobs" 

271 

272 digest_hash: str = Column(String, primary_key=True) 

273 digest_size_bytes: int = Column(BigInteger, nullable=False) 

274 data: bytes = Column(LargeBinary, nullable=False) 

275 

276 

277def digest_to_string(digest: Digest) -> str: 

278 return f"{digest.hash}/{digest.size_bytes}" 

279 

280 

281def string_to_digest(string: str) -> Digest: 

282 digest_hash, size_bytes = string.split("/", 1) 

283 return Digest(hash=digest_hash, size_bytes=int(size_bytes))