Coverage for quality/tasks.py: 56%
157 statements
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-26 15:42 +0000
« prev ^ index » next coverage.py v7.6.12, created at 2025-03-26 15:42 +0000
1import math
2import platform
3from http.client import IncompleteRead
4from pathlib import Path
6import pendulum
7from celery import chain, group, shared_task
8from celery.app.task import Task
9from celery.utils.log import get_task_logger
10from django.conf import settings
11from obspy import UTCDateTime, read
12from obspy import __version__ as obspy_version
13from obspy.clients.fdsn import Client
14from obspy.clients.fdsn.client import (
15 FDSNBadRequestException,
16 FDSNException,
17 FDSNNoDataException,
18)
19from obspy.clients.fdsn.header import platform_
20from obspy.core.inventory import read_inventory
21from obspy.core.stream import Stream
22from obspy.io.mseed import InternalMSEEDError
24from .models import (
25 Channel,
26 Check,
27 Network,
28 Operator,
29 Station,
30)
32logger = get_task_logger(__name__)
35# We request waveforms for 1 day (86400 seconds)
36CHECK_TIME_REACH = 86400
38# Used when decimating waveforms
39TARGET_SAMPLING_RATE = 5
41# Channel naming
42# See : https://www.fdsn.org/pdf/SEEDManual_V2.4.pdf p. 133
43BAND_CODE = 0
44INSTRUMENT_CODE = 1
46# High frequency channels according to channel naming's band code
47HIGH_FREQUENCY_CHANNELS = "FGDCESHB"
49# Seismometer channels according to channel naming's instrument code
50SEISMOMETER_CHANNELS = "HLGMN"
53def get_obspy_client() -> Client:
54 user_agent = (
55 f"WaveQC (ObsPy/{obspy_version}, {platform_}, "
56 f"Python {platform.python_version()})"
57 )
58 return Client(
59 settings.WAVEQC_FDSN_CLIENT, user_agent=user_agent, _discover_services=False
60 )
63@shared_task(
64 ignore_results=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
65)
66def update_inventory(date: str) -> None:
67 client = get_obspy_client()
68 networks = ",".join([network.code for network in Network.objects.only("code")])
69 inventory = client.get_stations(network=networks, level="station")
70 # We need to update all stations first to get an end_date if available
71 Network.objects.populate(inventory)
72 Station.objects.populate(inventory)
73 Operator.objects.populate(inventory)
74 Operator.objects.link_to_stations(inventory)
75 Operator.objects.purge_obsoletes()
76 target = UTCDateTime(date)
77 inventory = client.get_stations(
78 network=networks, level="channel", startbefore=target
79 )
80 Channel.objects.populate(inventory)
83@shared_task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True)
84def build_station_list(date: str) -> list[str]:
85 day = pendulum.parse(date)
86 stations = Station.objects.opened(day=day).select_related("network") # type: ignore[arg-type]
87 return [f"{station.network.code}_{station.code}" for station in stations]
90@shared_task(ignore_results=True)
91def download_stationxml(station: str, date: str) -> None:
92 client = get_obspy_client()
93 target = UTCDateTime(date)
94 network, station = station.split("_")
95 try:
96 client.get_stations(
97 network=network,
98 station=station,
99 level="response",
100 startbefore=target,
101 endafter=target,
102 filename=(
103 f"{settings.WAVEQC_STATIONXML_STORAGE_PATH}/{network}_{station}.xml"
104 ),
105 )
106 except FDSNNoDataException as _:
107 pass
108 except FDSNBadRequestException as _:
109 logger.warning("Bad request : %s", station)
110 except (FDSNException, IncompleteRead, ValueError) as _:
111 logger.warning("Web Service is not responding : %s", station)
112 # This last exception shouldn't be here. Obspy should return one of its exceptions
113 # Once obspy 1.5.0 is release, remove this
114 except AttributeError as _:
115 logger.warning("Web Service is not responding : %s", station)
118@shared_task(bind=True)
119def download_stationxmls(self: Task, stations: list[str], date: str) -> object:
120 return self.replace(
121 group([download_stationxml.si(station, date) for station in stations]),
122 )
125@shared_task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True)
126def build_check_list(date: str) -> list[dict[str, str]]:
127 day = pendulum.parse(date)
128 channels = Channel.objects.opened(day=day).select_related("station__network") # type: ignore[arg-type]
129 return [
130 {
131 "channel": channel.code,
132 "location": channel.location,
133 "station": channel.station.code,
134 "network": channel.station.network.code,
135 "date": date,
136 }
137 for channel in channels
138 ]
141@shared_task(autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True)
142def get_failed_checks() -> list[dict[str, object]]:
143 failed_checks = Check.objects.failed().select_related("channel__station__network")
144 return [
145 {
146 "channel": check.channel.code,
147 "location": check.channel.location,
148 "station": check.channel.station.code,
149 "network": check.channel.station.network.code,
150 "date": check.date,
151 }
152 for check in failed_checks
153 ]
156@shared_task
157def download_mseed(check: dict[str, str]) -> str:
158 client = get_obspy_client()
159 start = UTCDateTime(check["date"])
160 end = start + CHECK_TIME_REACH
161 filename = (
162 f"{settings.WAVEQC_MSEED_STORAGE_PATH}/"
163 f"{check['network']}."
164 f"{check['station']}."
165 f"{check['location']}."
166 f"{check['channel']}__"
167 f"{start.strftime('%Y%m%dT%H%M%SZ')}__"
168 f"{end.strftime('%Y%m%dT%H%M%SZ')}"
169 ".mseed"
170 )
172 try:
173 client.get_waveforms(
174 check["network"],
175 check["station"],
176 check["location"],
177 check["channel"],
178 start,
179 end,
180 filename=filename,
181 )
182 except FDSNNoDataException as _:
183 pass
184 except FDSNBadRequestException as _:
185 logger.warning(
186 "Bad request : %s",
187 ".".join([str(value) for value in check.values()]),
188 )
189 except (FDSNException, IncompleteRead, ValueError) as _:
190 logger.warning(
191 "Web Service is not responding : %s",
192 ".".join([str(value) for value in check.values()]),
193 )
194 return filename
197def check_completeness(stream: Stream) -> int:
198 datapoints = sum(trace.count() for trace in stream.traces) - 1
199 return int((datapoints - 1) * 1000 / stream.traces[0].stats.sampling_rate)
202def check_shortest_trace(stream: Stream) -> int:
203 sampling_rate = stream.traces[0].stats.sampling_rate
204 shortest_trace = min(len(trace) for trace in stream.traces) - 1
205 return math.ceil(shortest_trace / sampling_rate) # type: ignore [no-any-return]
208def check_removable_response(
209 stream: Stream,
210 channel: str,
211 network: str,
212 station: str,
213 count: int,
214) -> Check.Result:
215 if count > 1:
216 try:
217 stream.merge(fill_value=0)
218 except Exception as message: # noqa: BLE001
219 logger.warning(
220 "Channel %s.%s.%s have traces with different sampling rates : %s",
221 network,
222 station,
223 channel,
224 message,
225 )
226 return Check.Result.DECONVOLUTION_FAILS
227 trace = stream.traces[0]
229 # If instrument is environmental (RWD, etc...), do not try to remove response
230 if channel[INSTRUMENT_CODE] in SEISMOMETER_CHANNELS:
231 # decimate the trace only for hight frequency channels
232 if channel[BAND_CODE] in HIGH_FREQUENCY_CHANNELS:
233 decimation_factor = int(trace.stats.sampling_rate / TARGET_SAMPLING_RATE)
234 trace.decimate(decimation_factor, no_filter=True)
236 try:
237 inventory = read_inventory(
238 f"{settings.WAVEQC_STATIONXML_STORAGE_PATH}/{network}_{station}.xml",
239 )
240 trace.remove_response(inventory=inventory)
241 except (ValueError, FileNotFoundError, TypeError):
242 result = Check.Result.DECONVOLUTION_FAILS
243 else:
244 result = Check.Result.DECONVOLUTION_PASS
245 else:
246 result = Check.Result.DECONVOLUTION_PASS
247 return result
250@shared_task(
251 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
252)
253def check_item(mseed: str) -> None:
254 completeness = count = shortest_trace = 0
255 file = Path(mseed)
256 nslc, start, _ = file.name.split("__")
257 network, station, _, channel = nslc.split(".")
258 date = pendulum.parse(start[:8], exact=True)
259 try:
260 stream = read(file)
261 except (TypeError, InternalMSEEDError):
262 result = Check.Result.NOT_READABLE
263 except FileNotFoundError:
264 result = Check.Result.NO_DATA
265 else:
266 stream._cleanup() # noqa: SLF001
267 completeness = check_completeness(stream)
268 count = len(stream.traces)
269 shortest_trace = check_shortest_trace(stream)
270 result = check_removable_response(stream, channel, network, station, count)
271 file.unlink(missing_ok=True)
272 Check.objects.store(
273 nslc,
274 date, # type: ignore[arg-type]
275 result,
276 completeness,
277 count,
278 shortest_trace,
279 )
282@shared_task(bind=True)
283def run_checks(self: Task, checks: list[dict[str, object]]) -> object:
284 return self.replace(
285 group([chain(download_mseed.si(check), check_item.s()) for check in checks]),
286 )
289@shared_task(
290 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
291)
292def remove_stationxmls() -> None:
293 basepath = Path(settings.WAVEQC_STATIONXML_STORAGE_PATH)
294 for entry in basepath.iterdir():
295 entry.unlink()
298@shared_task(
299 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
300)
301def fix_closed_channels_checks() -> None:
302 Check.objects.fix_closed_channels()
305@shared_task(
306 ignore_result=True, autoretry_for=(Exception,), retry_backoff=10, retry_jitter=True
307)
308def fix_missing_checks() -> None:
309 Check.objects.fix_missing()
312@shared_task(ignore_result=True)
313def retry_failing_checks() -> None:
314 date = pendulum.yesterday("utc").to_date_string()
315 chain(
316 update_inventory.si(date),
317 build_station_list.si(date),
318 download_stationxmls.s(date),
319 get_failed_checks.si(),
320 run_checks.s(),
321 remove_stationxmls.si(),
322 fix_closed_channels_checks.si(),
323 fix_missing_checks.si(),
324 )()
327@shared_task(ignore_results=True)
328def launch_checks() -> None:
329 date = pendulum.yesterday("utc").to_date_string()
330 chain(
331 update_inventory.si(date),
332 build_station_list.si(date),
333 download_stationxmls.s(date),
334 build_check_list.si(date),
335 run_checks.s(),
336 remove_stationxmls.si(),
337 fix_closed_channels_checks.si(),
338 fix_missing_checks.si(),
339 )()