Coverage for quality/models.py: 96%
207 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-26 15:42 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-26 15:42 +0000
1import datetime
2from itertools import groupby
3from typing import Any
5import pendulum
6from django.db import models
7from django.db.models import Exists, F, FilteredRelation, Max, Min, OuterRef, Q
8from django.urls import reverse
9from obspy.core.inventory import Inventory
10from pendulum.date import Date
11from pendulum.datetime import DateTime
14class ChannelManager(models.Manager["Channel"]):
15 def populate(self, inventory: Inventory) -> None:
16 stations = {
17 network.code: {
18 station.code: station.pk for station in network.station_set.all()
19 }
20 for network in Network.objects.prefetch_related("station_set").only(
21 "code", "station__code", "station__id"
22 )
23 }
24 channels = [
25 {
26 "code": channel.code,
27 "location": channel.location_code,
28 "end_date": None
29 if channel.end_date.datetime.replace(tzinfo=datetime.UTC)
30 > pendulum.now("utc")
31 else channel.end_date.datetime.replace(tzinfo=datetime.UTC),
32 "raw_end_date": channel.end_date.datetime,
33 "station_id": stations[network.code][station.code],
34 }
35 for network in inventory.networks
36 for station in network.stations
37 for channel in station.channels
38 ]
40 channels.sort(
41 reverse=True,
42 key=lambda x: (
43 x["station_id"],
44 x["code"],
45 x["location"],
46 x["raw_end_date"],
47 ),
48 ) # Channels are now (reverse) sorted by nslc's and end_date
49 # Here, we filter only last epochs for each channel
50 last_channels = [
51 next(grouped) # we take only the first element (latest end_date)
52 for _, grouped in groupby(
53 channels,
54 lambda x: (x["station_id"], x["code"], x["location"]),
55 )
56 ]
57 for channel in last_channels:
58 del channel["raw_end_date"]
59 to_insert = [Channel(**channel) for channel in last_channels]
60 Channel.objects.bulk_create(
61 to_insert,
62 update_conflicts=True,
63 update_fields=["code", "location", "end_date", "station_id"],
64 unique_fields=["code", "station", "location"],
65 )
68class ChannelQuerySet(models.QuerySet["Channel"]):
69 def checks_exist(self) -> "ChannelQuerySet":
70 return self.filter(Exists(Check.objects.filter(channel__pk=OuterRef("pk"))))
72 def instruments(self) -> list[str]:
73 return sorted({channel["code"][:2] for channel in self.values("code")})
75 def opened(self, day: DateTime) -> "ChannelQuerySet":
76 return self.filter(
77 Q(station__start_date__lte=day),
78 Q(end_date__isnull=True) | Q(end_date__gt=day),
79 )
82class Channel(models.Model):
83 code = models.CharField(max_length=3)
84 location = models.CharField(max_length=8)
85 station = models.ForeignKey("Station", models.CASCADE)
86 end_date = models.DateTimeField(blank=True, null=True)
87 objects = ChannelManager.from_queryset(ChannelQuerySet)()
89 class Meta:
90 unique_together = (("code", "station", "location"),)
92 def __str__(self) -> str:
93 return f"{self.station}.{self.location}.{self.code}"
96class CheckManager(models.Manager["Check"]):
97 def store( # noqa: PLR0913
98 self,
99 nslc: str,
100 date: Date,
101 result: int = 0,
102 completeness: int = 0,
103 trace_count: int = 0,
104 shortest_trace: int = 0,
105 ) -> None:
106 _, station, location, channel = nslc.split(".")
107 channel_checked = Channel.objects.only("pk").get(
108 code=channel,
109 location=location,
110 station__code=station,
111 )
112 self.update_or_create(
113 channel_id=channel_checked.pk,
114 date=date,
115 defaults={
116 "retries": F("retries") + 1,
117 "result": result,
118 "completeness": completeness,
119 "shortest_trace": shortest_trace,
120 "trace_count": trace_count,
121 },
122 create_defaults={
123 "retries": 0,
124 "result": result,
125 "completeness": completeness,
126 "shortest_trace": shortest_trace,
127 "trace_count": trace_count,
128 },
129 )
131 def fix_missing(self) -> None:
132 # To find where missing checks are :
133 # - first, we select theoric checks (a check per day per channel)
134 # - then, we select actual checks stored in db
135 # - finally, we make a difference between theoric and actual checks
136 start = Check.objects.aggregate(min=Min("date"))["min"]
137 end = pendulum.yesterday("utc").date()
138 difference = Channel.objects.raw(
139 """
140 select id, period.day as day
141 from quality_channel
142 join (
143 select date_trunc('day', generate_series(%s::date, %s, %s)) as day
144 ) as period on True
145 where end_date > period.day or end_date is null
146 except
147 select channel_id, date from quality_check
148 """,
149 [start, end, "1 day"],
150 )
151 missing_checks = [
152 Check(channel_id=item.id, date=item.day) for item in difference
153 ]
154 Check.objects.bulk_create(missing_checks)
156 def fix_closed_channels(self) -> None:
157 # To find where checks for closed channels are :
158 # - first, we select which cheks should be marked as closed
159 # - then, we select actual checks for closed channels
160 # - finally, we make a difference between theoric and actual checks
161 start = Check.objects.aggregate(min=Min("date"))["min"]
162 end = pendulum.yesterday("utc").date()
163 difference = Channel.objects.raw(
164 """
165 select id, period.day as day
166 from quality_channel
167 join (select generate_series(%s::date, %s, %s) as day) as period on True
168 where
169 end_date <= period.day
170 and extract(year from end_date) = extract(year from period.day)
171 except
172 select channel_id, date from quality_check where result = %s::integer
173 """,
174 [start, end, "1 day", Check.Result.CHANNEL_CLOSED],
175 )
176 closed_channel_checks = [
177 Check(channel_id=item.id, date=item.day, result=Check.Result.CHANNEL_CLOSED)
178 for item in difference
179 ]
180 Check.objects.bulk_create(
181 closed_channel_checks,
182 update_conflicts=True,
183 update_fields=["result"],
184 unique_fields=["channel_id", "date"],
185 )
188class CheckQuerySet(models.QuerySet["Check"]):
189 def failed(self) -> "CheckQuerySet":
190 return self.filter(
191 retries__lt=CHECK_MAX_RETRIES,
192 result__lte=Check.Result.DECONVOLUTION_FAILS,
193 )
195 def year(self, year: int | None = None) -> "CheckQuerySet":
196 if year:
197 requested_year = year
198 period_filter = Q(date__year=requested_year)
199 else:
200 requested_year = pendulum.today("utc").year
201 period_filter = Q(date__gte=pendulum.today("utc").subtract(months=1))
202 return self.filter(
203 period_filter,
204 Q(channel__end_date=None) | Q(channel__end_date__year=requested_year),
205 channel__station__start_date__year__lte=requested_year,
206 )
208 def networks(self, networks: list[str] | None = None) -> "CheckQuerySet":
209 if networks:
210 return self.filter(channel__station__network__code__in=networks)
211 return self
213 def stations(self, stations: list[str] | None = None) -> "CheckQuerySet":
214 if stations:
215 return self.filter(channel__station__code__in=stations)
216 return self
218 def channels(self, channels: list[str] | None = None) -> "CheckQuerySet":
219 if channels:
220 filters = Q()
221 for channel in channels:
222 filters |= Q(channel__code__istartswith=channel)
223 return self.filter(filters)
224 return self
226 def operators(self, operators: list[str] | None = None) -> "CheckQuerySet":
227 if operators:
228 filters = Q()
229 for operator in operators:
230 filters |= Q(channel__station__operators__agency__icontains=operator)
231 return self.filter(filters)
232 return self
234 def triggered(self, triggered: bool | None = None) -> "CheckQuerySet":
235 if triggered is not None:
236 return self.filter(channel__station__triggered=triggered)
237 return self
239 def summary(self) -> Any:
240 return (
241 self.select_related("channel__station__network")
242 .values_list(
243 "channel_id",
244 "channel__code",
245 "date",
246 "completeness",
247 "result",
248 "trace_count",
249 "shortest_trace",
250 "channel__location",
251 "channel__station__code",
252 "channel__station__network__code",
253 )
254 .order_by(
255 "-channel__station__network__code",
256 "-channel__station__code",
257 "-channel__location",
258 "-channel__code",
259 "date",
260 )
261 )
264class Check(models.Model):
265 class Result(models.IntegerChoices):
266 NO_DATA = 0
267 NOT_READABLE = 1
268 DECONVOLUTION_FAILS = 2
269 DECONVOLUTION_PASS = 3
270 CHANNEL_CLOSED = 4
272 channel = models.ForeignKey(Channel, models.CASCADE)
273 date = models.DateField()
274 result = models.IntegerField(choices=Result, default=Result.NO_DATA)
275 retries = models.IntegerField(default=0)
276 completeness = models.IntegerField(default=0)
277 shortest_trace = models.IntegerField(default=0)
278 trace_count = models.IntegerField(default=0)
279 objects = CheckManager.from_queryset(CheckQuerySet)()
281 class Meta:
282 unique_together = (("channel", "date"),)
284 def __str__(self) -> str:
285 return f"{self.channel} - {self.date}"
288class NetworkManager(models.Manager["Network"]):
289 def populate(self, inventory: Inventory) -> None:
290 values = [
291 Network(code=network.code, description=network.description)
292 for network in inventory.networks
293 ]
294 Network.objects.bulk_create(
295 values,
296 update_conflicts=True,
297 update_fields=["description"],
298 unique_fields=["code"],
299 )
302class Network(models.Model):
303 code = models.CharField(unique=True, max_length=8)
304 description = models.TextField(blank=True, default="")
305 objects = NetworkManager()
307 def __str__(self) -> str:
308 return self.code
311class StationManager(models.Manager["Station"]):
312 def populate(self, inventory: Inventory) -> None:
313 networks = {
314 network.code: network.pk for network in Network.objects.only("pk", "code")
315 }
316 values = [
317 Station(
318 code=station.code,
319 start_date=station.start_date.datetime.replace(tzinfo=datetime.UTC),
320 end_date=None
321 if station.end_date.datetime.replace(tzinfo=datetime.UTC)
322 > pendulum.now("utc")
323 else station.end_date.datetime.replace(tzinfo=datetime.UTC),
324 network_id=networks[network.code],
325 )
326 for network in inventory.networks
327 for station in network.stations
328 ]
329 Station.objects.bulk_create(
330 values,
331 update_conflicts=True,
332 update_fields=["code", "start_date", "end_date", "network_id"],
333 unique_fields=["code", "network_id"],
334 )
337class StationQuerySet(models.QuerySet["Station"]):
338 def overview(self, network_id: int) -> Any:
339 current_year = pendulum.today("utc").year
340 one_week_ago = pendulum.today("utc").subtract(days=7)
341 return (
342 Station.objects.filter(
343 Q(channel__end_date=None) | Q(channel__end_date__year=current_year),
344 network_id=network_id,
345 channel__check__date__gt=one_week_ago,
346 start_date__year__lte=current_year,
347 )
348 .annotate(result=Min("channel__check__result"))
349 .values("code", "id", "result")
350 .order_by("code")
351 )
353 def opened(self, day: DateTime) -> "StationQuerySet":
354 return self.filter(
355 Q(start_date__lte=day),
356 Q(end_date__isnull=True) | Q(end_date__gt=day),
357 )
359 def networks(self, networks: list[str] | None = None) -> "StationQuerySet":
360 if networks:
361 return self.filter(network__code__in=networks)
362 return self
364 def stations(self, stations: list[str] | None = None) -> "StationQuerySet":
365 if stations:
366 return self.filter(code__in=stations)
367 return self
369 def channels(self, channels: list[str] | None = None) -> "StationQuerySet":
370 filters = Q()
371 if channels:
372 for channel in channels:
373 filters |= Q(channel__code__istartswith=channel)
374 return self.annotate(
375 f_channels=FilteredRelation("channel", condition=filters),
376 )
378 def operators(self, operators: list[str] | None = None) -> "StationQuerySet":
379 if operators:
380 filters = Q()
381 for operator in operators:
382 filters |= Q(operators__agency__icontains=operator)
383 return self.filter(filters)
384 return self
386 def year(self, year: int | None = None) -> "StationQuerySet":
387 if year:
388 requested_year = year
389 period_filter = Q(f_channels__check__date__year=requested_year)
390 else:
391 requested_year = pendulum.today("utc").year
392 period_filter = Q(
393 f_channels__check__date__gte=pendulum.today("utc").subtract(months=1),
394 )
395 return self.filter(
396 period_filter,
397 Q(f_channels__end_date=None) | Q(f_channels__end_date__year=requested_year),
398 start_date__year__lte=requested_year,
399 )
401 def triggered(self, triggered: bool | None = None) -> "StationQuerySet":
402 if triggered is not None:
403 return self.filter(triggered=triggered)
404 return self
406 def summary(self) -> Any:
407 return (
408 self.select_related("network")
409 .annotate(
410 completeness=Min("f_channels__check__completeness"),
411 result=Min("f_channels__check__result"),
412 trace_count=Max("f_channels__check__trace_count"),
413 shortest_trace=Min("f_channels__check__shortest_trace"),
414 )
415 .values_list(
416 "code",
417 "pk",
418 "f_channels__check__date",
419 "completeness",
420 "result",
421 "trace_count",
422 "shortest_trace",
423 "network__code",
424 )
425 .order_by(
426 "-network__code",
427 "-code",
428 "f_channels__check__date",
429 )
430 )
433class Station(models.Model):
434 code = models.CharField(max_length=8)
435 network = models.ForeignKey(Network, models.CASCADE)
436 start_date = models.DateTimeField(blank=True, null=True)
437 end_date = models.DateTimeField(blank=True, null=True)
438 triggered = models.BooleanField(default=False)
439 objects = StationManager.from_queryset(StationQuerySet)()
441 class Meta:
442 unique_together = (("code", "network"),)
444 def __str__(self) -> str:
445 return f"{self.network}.{self.code}"
447 def get_absolute_url(self) -> str:
448 return reverse("quality:station-detail", kwargs={"pk": self.pk})
451class OperatorManager(models.Manager["Operator"]):
452 def populate(self, inventory: Inventory) -> None:
453 operators = [
454 Operator(agency=item)
455 for item in {
456 operator.agency
457 for network in inventory.networks
458 for station in network.stations
459 for operator in station.operators
460 }
461 ]
462 Operator.objects.bulk_create(
463 operators,
464 update_conflicts=True,
465 update_fields=["website"],
466 unique_fields=["agency"],
467 )
469 def link_to_stations(self, inventory: Inventory) -> None:
470 operators = {
471 operator.agency: operator.pk
472 for operator in Operator.objects.only("pk", "agency")
473 }
474 stations = {
475 network.code: {
476 station.code: station.pk for station in network.station_set.all()
477 }
478 for network in Network.objects.prefetch_related("station_set").only(
479 "code", "station__code"
480 )
481 }
482 relationship = Station.operators.through
483 relations = [
484 relationship(
485 station_id=stations[network.code][station.code],
486 operator_id=operators[operator.agency],
487 )
488 for network in inventory.networks
489 for station in network.stations
490 for operator in station.operators
491 ]
492 relationship.objects.all().delete()
493 relationship.objects.bulk_create(relations)
495 def purge_obsoletes(self) -> None:
496 self.filter(stations__isnull=True).delete()
499class OperatorQuerySet(models.QuerySet["Operator"]):
500 def acronyms_only(self) -> list[str]:
501 return [
502 operator.agency.upper().split("(")[1].split(")")[0]
503 if "(" in operator.agency
504 else operator.agency
505 for operator in self
506 ]
508 def checks_exist(self) -> models.QuerySet["Operator"]:
509 return self.filter(
510 Exists(Check.objects.filter(channel=OuterRef("stations__channel"))),
511 ).distinct()
514class Operator(models.Model):
515 agency = models.CharField(unique=True, max_length=255)
516 website = models.URLField(blank=True)
517 stations = models.ManyToManyField(Station, related_name="operators")
518 objects = OperatorManager.from_queryset(OperatorQuerySet)()
520 def __str__(self) -> str:
521 return self.agency
524# Maximum retries for each check
525CHECK_MAX_RETRIES = 3
527RESULT_PONDERATION = {
528 Check.Result.NO_DATA: 50,
529 Check.Result.NOT_READABLE: 100,
530 Check.Result.DECONVOLUTION_FAILS: 150,
531 Check.Result.DECONVOLUTION_PASS: 255,
532 Check.Result.CHANNEL_CLOSED: 0,
533}