Source code for instaseis.database_interfaces.forward_instaseis_db

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Python library to extract seismograms from a set of wavefields generated by
AxiSEM.

:copyright:
    Martin van Driel (Martin@vanDriel.de), 2020
    Lion Krischer (lion.krischer@gmail.com), 2020
:license:
    GNU Lesser General Public License, Version 3 [non-commercial/academic use]
    (http://www.gnu.org/copyleft/lgpl.html)
"""
import collections
import numpy as np

from .base_netcdf_instaseis_db import BaseNetCDFInstaseisDB
from . import mesh
from .. import rotations
from ..source import Source


[docs]class ForwardInstaseisDB(BaseNetCDFInstaseisDB): """ Forward Instaseis database. """ def __init__( self, db_path, netcdf_files, buffer_size_in_mb=100, read_on_demand=False, *args, **kwargs, ): """ :param db_path: Path to the Instaseis Database containing subdirectories PZ and/or PX each containing a ``order_output.nc4`` file. :type db_path: str :param buffer_size_in_mb: Strain and displacement are buffered to avoid repeated disc access. Depending on the type of database and the number of components of the database, the total buffer memory can be up to four times this number. The optimal value is highly application and system dependent. :type buffer_size_in_mb: int, optional :param read_on_demand: Read several global fields on demand (faster initialization) or on initialization (slower initialization, faster in individual seismogram extraction, useful e.g. for finite sources, default). :type read_on_demand: bool, optional """ BaseNetCDFInstaseisDB.__init__( self, db_path=db_path, buffer_size_in_mb=buffer_size_in_mb, read_on_demand=read_on_demand, *args, **kwargs, ) self._parse_meshes(netcdf_files) def _parse_meshes(self, files): m1_m = mesh.Mesh( files["MZZ"], full_parse=True, strain_buffer_size_in_mb=0, displ_buffer_size_in_mb=self.buffer_size_in_mb, read_on_demand=self.read_on_demand, ) m2_m = mesh.Mesh( files["MXX_P_MYY"], full_parse=False, strain_buffer_size_in_mb=0, displ_buffer_size_in_mb=self.buffer_size_in_mb, read_on_demand=self.read_on_demand, ) m3_m = mesh.Mesh( files["MXZ_MYZ"], full_parse=False, strain_buffer_size_in_mb=0, displ_buffer_size_in_mb=self.buffer_size_in_mb, read_on_demand=self.read_on_demand, ) m4_m = mesh.Mesh( files["MXY_MXX_M_MYY"], full_parse=False, strain_buffer_size_in_mb=0, displ_buffer_size_in_mb=self.buffer_size_in_mb, read_on_demand=self.read_on_demand, ) self.parsed_mesh = m1_m MeshCollection_fwd = collections.namedtuple( "MeshCollection_fwd", ["m1", "m2", "m3", "m4"] ) self.meshes = MeshCollection_fwd(m1_m, m2_m, m3_m, m4_m) self._is_reciprocal = False def _get_data( self, source, receiver, components, coordinates, element_info ): ei = element_info # Collect data arrays and mu in a dictionary. data = {} mesh = self.parsed_mesh.f["Mesh"] # Get mu. if not self.read_on_demand: mesh_mu = self.parsed_mesh.mesh_mu else: mesh_mu = mesh["mesh_mu"] npol = self.info.spatial_order data["mu"] = mesh_mu[ei.gll_point_ids[npol // 2, npol // 2]] if not isinstance(source, Source): raise NotImplementedError if self.info.dump_type != "displ_only": raise NotImplementedError displ_1 = self._get_displacement( self.meshes.m1, ei.id_elem, ei.gll_point_ids, ei.col_points_xi, ei.col_points_eta, ei.xi, ei.eta, ) displ_2 = self._get_displacement( self.meshes.m2, ei.id_elem, ei.gll_point_ids, ei.col_points_xi, ei.col_points_eta, ei.xi, ei.eta, ) displ_3 = self._get_displacement( self.meshes.m3, ei.id_elem, ei.gll_point_ids, ei.col_points_xi, ei.col_points_eta, ei.xi, ei.eta, ) displ_4 = self._get_displacement( self.meshes.m4, ei.id_elem, ei.gll_point_ids, ei.col_points_xi, ei.col_points_eta, ei.xi, ei.eta, ) mij = source.tensor / self.parsed_mesh.amplitude # mij is [m_rr, m_tt, m_pp, m_rt, m_rp, m_tp] # final is in s, phi, z coordinates final = np.zeros((displ_1.shape[0], 3), dtype="float64") final[:, 0] += displ_1[:, 0] * mij[0] final[:, 2] += displ_1[:, 2] * mij[0] final[:, 0] += displ_2[:, 0] * (mij[1] + mij[2]) final[:, 2] += displ_2[:, 2] * (mij[1] + mij[2]) fac_1 = mij[3] * np.cos(coordinates.phi) + mij[4] * np.sin( coordinates.phi ) fac_2 = -mij[3] * np.sin(coordinates.phi) + mij[4] * np.cos( coordinates.phi ) final[:, 0] += displ_3[:, 0] * fac_1 final[:, 1] += displ_3[:, 1] * fac_2 final[:, 2] += displ_3[:, 2] * fac_1 fac_1 = (mij[1] - mij[2]) * np.cos(2 * coordinates.phi) + 2 * mij[ 5 ] * np.sin(2 * coordinates.phi) fac_2 = -(mij[1] - mij[2]) * np.sin(2 * coordinates.phi) + 2 * mij[ 5 ] * np.cos(2 * coordinates.phi) final[:, 0] += displ_4[:, 0] * fac_1 final[:, 1] += displ_4[:, 1] * fac_2 final[:, 2] += displ_4[:, 2] * fac_1 rotmesh_colat = np.arctan2(coordinates.s, coordinates.z) if "T" in components: # need the - for consistency with reciprocal mode, # need external verification still data["T"] = -final[:, 1] if "R" in components: data["R"] = final[:, 0] * np.cos(rotmesh_colat) - final[ :, 2 ] * np.sin(rotmesh_colat) if "N" in components or "E" in components or "Z" in components: # transpose needed because rotations assume different slicing # (ugly) final = rotations.rotate_vector_src_to_NEZ( final.T, coordinates.phi, source.longitude_rad, source.colatitude_rad, receiver.longitude_rad, receiver.colatitude_rad, ).T if "N" in components: data["N"] = final[:, 0] if "E" in components: data["E"] = final[:, 1] if "Z" in components: data["Z"] = final[:, 2] return data