Coverage for quality/models.py: 96%

207 statements  

« prev     ^ index     » next       coverage.py v7.6.12, created at 2025-03-26 15:42 +0000

1import datetime 

2from itertools import groupby 

3from typing import Any 

4 

5import pendulum 

6from django.db import models 

7from django.db.models import Exists, F, FilteredRelation, Max, Min, OuterRef, Q 

8from django.urls import reverse 

9from obspy.core.inventory import Inventory 

10from pendulum.date import Date 

11from pendulum.datetime import DateTime 

12 

13 

14class ChannelManager(models.Manager["Channel"]): 

15 def populate(self, inventory: Inventory) -> None: 

16 stations = { 

17 network.code: { 

18 station.code: station.pk for station in network.station_set.all() 

19 } 

20 for network in Network.objects.prefetch_related("station_set").only( 

21 "code", "station__code", "station__id" 

22 ) 

23 } 

24 channels = [ 

25 { 

26 "code": channel.code, 

27 "location": channel.location_code, 

28 "end_date": None 

29 if channel.end_date.datetime.replace(tzinfo=datetime.UTC) 

30 > pendulum.now("utc") 

31 else channel.end_date.datetime.replace(tzinfo=datetime.UTC), 

32 "raw_end_date": channel.end_date.datetime, 

33 "station_id": stations[network.code][station.code], 

34 } 

35 for network in inventory.networks 

36 for station in network.stations 

37 for channel in station.channels 

38 ] 

39 

40 channels.sort( 

41 reverse=True, 

42 key=lambda x: ( 

43 x["station_id"], 

44 x["code"], 

45 x["location"], 

46 x["raw_end_date"], 

47 ), 

48 ) # Channels are now (reverse) sorted by nslc's and end_date 

49 # Here, we filter only last epochs for each channel 

50 last_channels = [ 

51 next(grouped) # we take only the first element (latest end_date) 

52 for _, grouped in groupby( 

53 channels, 

54 lambda x: (x["station_id"], x["code"], x["location"]), 

55 ) 

56 ] 

57 for channel in last_channels: 

58 del channel["raw_end_date"] 

59 to_insert = [Channel(**channel) for channel in last_channels] 

60 Channel.objects.bulk_create( 

61 to_insert, 

62 update_conflicts=True, 

63 update_fields=["code", "location", "end_date", "station_id"], 

64 unique_fields=["code", "station", "location"], 

65 ) 

66 

67 

68class ChannelQuerySet(models.QuerySet["Channel"]): 

69 def checks_exist(self) -> "ChannelQuerySet": 

70 return self.filter(Exists(Check.objects.filter(channel__pk=OuterRef("pk")))) 

71 

72 def instruments(self) -> list[str]: 

73 return sorted({channel["code"][:2] for channel in self.values("code")}) 

74 

75 def opened(self, day: DateTime) -> "ChannelQuerySet": 

76 return self.filter( 

77 Q(station__start_date__lte=day), 

78 Q(end_date__isnull=True) | Q(end_date__gt=day), 

79 ) 

80 

81 

82class Channel(models.Model): 

83 code = models.CharField(max_length=3) 

84 location = models.CharField(max_length=8) 

85 station = models.ForeignKey("Station", models.CASCADE) 

86 end_date = models.DateTimeField(blank=True, null=True) 

87 objects = ChannelManager.from_queryset(ChannelQuerySet)() 

88 

89 class Meta: 

90 unique_together = (("code", "station", "location"),) 

91 

92 def __str__(self) -> str: 

93 return f"{self.station}.{self.location}.{self.code}" 

94 

95 

96class CheckManager(models.Manager["Check"]): 

97 def store( # noqa: PLR0913 

98 self, 

99 nslc: str, 

100 date: Date, 

101 result: int = 0, 

102 completeness: int = 0, 

103 trace_count: int = 0, 

104 shortest_trace: int = 0, 

105 ) -> None: 

106 _, station, location, channel = nslc.split(".") 

107 channel_checked = Channel.objects.only("pk").get( 

108 code=channel, 

109 location=location, 

110 station__code=station, 

111 ) 

112 self.update_or_create( 

113 channel_id=channel_checked.pk, 

114 date=date, 

115 defaults={ 

116 "retries": F("retries") + 1, 

117 "result": result, 

118 "completeness": completeness, 

119 "shortest_trace": shortest_trace, 

120 "trace_count": trace_count, 

121 }, 

122 create_defaults={ 

123 "retries": 0, 

124 "result": result, 

125 "completeness": completeness, 

126 "shortest_trace": shortest_trace, 

127 "trace_count": trace_count, 

128 }, 

129 ) 

130 

131 def fix_missing(self) -> None: 

132 # To find where missing checks are : 

133 # - first, we select theoric checks (a check per day per channel) 

134 # - then, we select actual checks stored in db 

135 # - finally, we make a difference between theoric and actual checks 

136 start = Check.objects.aggregate(min=Min("date"))["min"] 

137 end = pendulum.yesterday("utc").date() 

138 difference = Channel.objects.raw( 

139 """ 

140 select id, period.day as day 

141 from quality_channel 

142 join ( 

143 select date_trunc('day', generate_series(%s::date, %s, %s)) as day 

144 ) as period on True 

145 where end_date > period.day or end_date is null 

146 except 

147 select channel_id, date from quality_check 

148 """, 

149 [start, end, "1 day"], 

150 ) 

151 missing_checks = [ 

152 Check(channel_id=item.id, date=item.day) for item in difference 

153 ] 

154 Check.objects.bulk_create(missing_checks) 

155 

156 def fix_closed_channels(self) -> None: 

157 # To find where checks for closed channels are : 

158 # - first, we select which cheks should be marked as closed 

159 # - then, we select actual checks for closed channels 

160 # - finally, we make a difference between theoric and actual checks 

161 start = Check.objects.aggregate(min=Min("date"))["min"] 

162 end = pendulum.yesterday("utc").date() 

163 difference = Channel.objects.raw( 

164 """ 

165 select id, period.day as day 

166 from quality_channel 

167 join (select generate_series(%s::date, %s, %s) as day) as period on True 

168 where 

169 end_date <= period.day 

170 and extract(year from end_date) = extract(year from period.day) 

171 except 

172 select channel_id, date from quality_check where result = %s::integer 

173 """, 

174 [start, end, "1 day", Check.Result.CHANNEL_CLOSED], 

175 ) 

176 closed_channel_checks = [ 

177 Check(channel_id=item.id, date=item.day, result=Check.Result.CHANNEL_CLOSED) 

178 for item in difference 

179 ] 

180 Check.objects.bulk_create( 

181 closed_channel_checks, 

182 update_conflicts=True, 

183 update_fields=["result"], 

184 unique_fields=["channel_id", "date"], 

185 ) 

186 

187 

188class CheckQuerySet(models.QuerySet["Check"]): 

189 def failed(self) -> "CheckQuerySet": 

190 return self.filter( 

191 retries__lt=CHECK_MAX_RETRIES, 

192 result__lte=Check.Result.DECONVOLUTION_FAILS, 

193 ) 

194 

195 def year(self, year: int | None = None) -> "CheckQuerySet": 

196 if year: 

197 requested_year = year 

198 period_filter = Q(date__year=requested_year) 

199 else: 

200 requested_year = pendulum.today("utc").year 

201 period_filter = Q(date__gte=pendulum.today("utc").subtract(months=1)) 

202 return self.filter( 

203 period_filter, 

204 Q(channel__end_date=None) | Q(channel__end_date__year=requested_year), 

205 channel__station__start_date__year__lte=requested_year, 

206 ) 

207 

208 def networks(self, networks: list[str] | None = None) -> "CheckQuerySet": 

209 if networks: 

210 return self.filter(channel__station__network__code__in=networks) 

211 return self 

212 

213 def stations(self, stations: list[str] | None = None) -> "CheckQuerySet": 

214 if stations: 

215 return self.filter(channel__station__code__in=stations) 

216 return self 

217 

218 def channels(self, channels: list[str] | None = None) -> "CheckQuerySet": 

219 if channels: 

220 filters = Q() 

221 for channel in channels: 

222 filters |= Q(channel__code__istartswith=channel) 

223 return self.filter(filters) 

224 return self 

225 

226 def operators(self, operators: list[str] | None = None) -> "CheckQuerySet": 

227 if operators: 

228 filters = Q() 

229 for operator in operators: 

230 filters |= Q(channel__station__operators__agency__icontains=operator) 

231 return self.filter(filters) 

232 return self 

233 

234 def triggered(self, triggered: bool | None = None) -> "CheckQuerySet": 

235 if triggered is not None: 

236 return self.filter(channel__station__triggered=triggered) 

237 return self 

238 

239 def summary(self) -> Any: 

240 return ( 

241 self.select_related("channel__station__network") 

242 .values_list( 

243 "channel_id", 

244 "channel__code", 

245 "date", 

246 "completeness", 

247 "result", 

248 "trace_count", 

249 "shortest_trace", 

250 "channel__location", 

251 "channel__station__code", 

252 "channel__station__network__code", 

253 ) 

254 .order_by( 

255 "-channel__station__network__code", 

256 "-channel__station__code", 

257 "-channel__location", 

258 "-channel__code", 

259 "date", 

260 ) 

261 ) 

262 

263 

264class Check(models.Model): 

265 class Result(models.IntegerChoices): 

266 NO_DATA = 0 

267 NOT_READABLE = 1 

268 DECONVOLUTION_FAILS = 2 

269 DECONVOLUTION_PASS = 3 

270 CHANNEL_CLOSED = 4 

271 

272 channel = models.ForeignKey(Channel, models.CASCADE) 

273 date = models.DateField() 

274 result = models.IntegerField(choices=Result, default=Result.NO_DATA) 

275 retries = models.IntegerField(default=0) 

276 completeness = models.IntegerField(default=0) 

277 shortest_trace = models.IntegerField(default=0) 

278 trace_count = models.IntegerField(default=0) 

279 objects = CheckManager.from_queryset(CheckQuerySet)() 

280 

281 class Meta: 

282 unique_together = (("channel", "date"),) 

283 

284 def __str__(self) -> str: 

285 return f"{self.channel} - {self.date}" 

286 

287 

288class NetworkManager(models.Manager["Network"]): 

289 def populate(self, inventory: Inventory) -> None: 

290 values = [ 

291 Network(code=network.code, description=network.description) 

292 for network in inventory.networks 

293 ] 

294 Network.objects.bulk_create( 

295 values, 

296 update_conflicts=True, 

297 update_fields=["description"], 

298 unique_fields=["code"], 

299 ) 

300 

301 

302class Network(models.Model): 

303 code = models.CharField(unique=True, max_length=8) 

304 description = models.TextField(blank=True, default="") 

305 objects = NetworkManager() 

306 

307 def __str__(self) -> str: 

308 return self.code 

309 

310 

311class StationManager(models.Manager["Station"]): 

312 def populate(self, inventory: Inventory) -> None: 

313 networks = { 

314 network.code: network.pk for network in Network.objects.only("pk", "code") 

315 } 

316 values = [ 

317 Station( 

318 code=station.code, 

319 start_date=station.start_date.datetime.replace(tzinfo=datetime.UTC), 

320 end_date=None 

321 if station.end_date.datetime.replace(tzinfo=datetime.UTC) 

322 > pendulum.now("utc") 

323 else station.end_date.datetime.replace(tzinfo=datetime.UTC), 

324 network_id=networks[network.code], 

325 ) 

326 for network in inventory.networks 

327 for station in network.stations 

328 ] 

329 Station.objects.bulk_create( 

330 values, 

331 update_conflicts=True, 

332 update_fields=["code", "start_date", "end_date", "network_id"], 

333 unique_fields=["code", "network_id"], 

334 ) 

335 

336 

337class StationQuerySet(models.QuerySet["Station"]): 

338 def overview(self, network_id: int) -> Any: 

339 current_year = pendulum.today("utc").year 

340 one_week_ago = pendulum.today("utc").subtract(days=7) 

341 return ( 

342 Station.objects.filter( 

343 Q(channel__end_date=None) | Q(channel__end_date__year=current_year), 

344 network_id=network_id, 

345 channel__check__date__gt=one_week_ago, 

346 start_date__year__lte=current_year, 

347 ) 

348 .annotate(result=Min("channel__check__result")) 

349 .values("code", "id", "result") 

350 .order_by("code") 

351 ) 

352 

353 def opened(self, day: DateTime) -> "StationQuerySet": 

354 return self.filter( 

355 Q(start_date__lte=day), 

356 Q(end_date__isnull=True) | Q(end_date__gt=day), 

357 ) 

358 

359 def networks(self, networks: list[str] | None = None) -> "StationQuerySet": 

360 if networks: 

361 return self.filter(network__code__in=networks) 

362 return self 

363 

364 def stations(self, stations: list[str] | None = None) -> "StationQuerySet": 

365 if stations: 

366 return self.filter(code__in=stations) 

367 return self 

368 

369 def channels(self, channels: list[str] | None = None) -> "StationQuerySet": 

370 filters = Q() 

371 if channels: 

372 for channel in channels: 

373 filters |= Q(channel__code__istartswith=channel) 

374 return self.annotate( 

375 f_channels=FilteredRelation("channel", condition=filters), 

376 ) 

377 

378 def operators(self, operators: list[str] | None = None) -> "StationQuerySet": 

379 if operators: 

380 filters = Q() 

381 for operator in operators: 

382 filters |= Q(operators__agency__icontains=operator) 

383 return self.filter(filters) 

384 return self 

385 

386 def year(self, year: int | None = None) -> "StationQuerySet": 

387 if year: 

388 requested_year = year 

389 period_filter = Q(f_channels__check__date__year=requested_year) 

390 else: 

391 requested_year = pendulum.today("utc").year 

392 period_filter = Q( 

393 f_channels__check__date__gte=pendulum.today("utc").subtract(months=1), 

394 ) 

395 return self.filter( 

396 period_filter, 

397 Q(f_channels__end_date=None) | Q(f_channels__end_date__year=requested_year), 

398 start_date__year__lte=requested_year, 

399 ) 

400 

401 def triggered(self, triggered: bool | None = None) -> "StationQuerySet": 

402 if triggered is not None: 

403 return self.filter(triggered=triggered) 

404 return self 

405 

406 def summary(self) -> Any: 

407 return ( 

408 self.select_related("network") 

409 .annotate( 

410 completeness=Min("f_channels__check__completeness"), 

411 result=Min("f_channels__check__result"), 

412 trace_count=Max("f_channels__check__trace_count"), 

413 shortest_trace=Min("f_channels__check__shortest_trace"), 

414 ) 

415 .values_list( 

416 "code", 

417 "pk", 

418 "f_channels__check__date", 

419 "completeness", 

420 "result", 

421 "trace_count", 

422 "shortest_trace", 

423 "network__code", 

424 ) 

425 .order_by( 

426 "-network__code", 

427 "-code", 

428 "f_channels__check__date", 

429 ) 

430 ) 

431 

432 

433class Station(models.Model): 

434 code = models.CharField(max_length=8) 

435 network = models.ForeignKey(Network, models.CASCADE) 

436 start_date = models.DateTimeField(blank=True, null=True) 

437 end_date = models.DateTimeField(blank=True, null=True) 

438 triggered = models.BooleanField(default=False) 

439 objects = StationManager.from_queryset(StationQuerySet)() 

440 

441 class Meta: 

442 unique_together = (("code", "network"),) 

443 

444 def __str__(self) -> str: 

445 return f"{self.network}.{self.code}" 

446 

447 def get_absolute_url(self) -> str: 

448 return reverse("quality:station-detail", kwargs={"pk": self.pk}) 

449 

450 

451class OperatorManager(models.Manager["Operator"]): 

452 def populate(self, inventory: Inventory) -> None: 

453 operators = [ 

454 Operator(agency=item) 

455 for item in { 

456 operator.agency 

457 for network in inventory.networks 

458 for station in network.stations 

459 for operator in station.operators 

460 } 

461 ] 

462 Operator.objects.bulk_create( 

463 operators, 

464 update_conflicts=True, 

465 update_fields=["website"], 

466 unique_fields=["agency"], 

467 ) 

468 

469 def link_to_stations(self, inventory: Inventory) -> None: 

470 operators = { 

471 operator.agency: operator.pk 

472 for operator in Operator.objects.only("pk", "agency") 

473 } 

474 stations = { 

475 network.code: { 

476 station.code: station.pk for station in network.station_set.all() 

477 } 

478 for network in Network.objects.prefetch_related("station_set").only( 

479 "code", "station__code" 

480 ) 

481 } 

482 relationship = Station.operators.through 

483 relations = [ 

484 relationship( 

485 station_id=stations[network.code][station.code], 

486 operator_id=operators[operator.agency], 

487 ) 

488 for network in inventory.networks 

489 for station in network.stations 

490 for operator in station.operators 

491 ] 

492 relationship.objects.all().delete() 

493 relationship.objects.bulk_create(relations) 

494 

495 def purge_obsoletes(self) -> None: 

496 self.filter(stations__isnull=True).delete() 

497 

498 

499class OperatorQuerySet(models.QuerySet["Operator"]): 

500 def acronyms_only(self) -> list[str]: 

501 return [ 

502 operator.agency.upper().split("(")[1].split(")")[0] 

503 if "(" in operator.agency 

504 else operator.agency 

505 for operator in self 

506 ] 

507 

508 def checks_exist(self) -> models.QuerySet["Operator"]: 

509 return self.filter( 

510 Exists(Check.objects.filter(channel=OuterRef("stations__channel"))), 

511 ).distinct() 

512 

513 

514class Operator(models.Model): 

515 agency = models.CharField(unique=True, max_length=255) 

516 website = models.URLField(blank=True) 

517 stations = models.ManyToManyField(Station, related_name="operators") 

518 objects = OperatorManager.from_queryset(OperatorQuerySet)() 

519 

520 def __str__(self) -> str: 

521 return self.agency 

522 

523 

524# Maximum retries for each check 

525CHECK_MAX_RETRIES = 3 

526 

527RESULT_PONDERATION = { 

528 Check.Result.NO_DATA: 50, 

529 Check.Result.NOT_READABLE: 100, 

530 Check.Result.DECONVOLUTION_FAILS: 150, 

531 Check.Result.DECONVOLUTION_PASS: 255, 

532 Check.Result.CHANNEL_CLOSED: 0, 

533}