"""
Velocity Autocorrelation Function --- :mod:`transport_analysis.velocityautocorr`
================================================================================
This module offers a class to efficiently calculate a velocity
autocorrelation function (VACF). Averaging over all atoms in the atom group
``ag``, regardless of type, it will calculate
.. math::
C(j \Delta t) = {1 \over N - j} \sum_{i=0}^{N-1-j}
v(i \Delta t)v((i+j)\Delta t)
where :math:`N` is the number of time frames and :math:`\Delta t` are
discrete time intervals between data points.
Basic usage
-----------
This example uses the files :data:`~MDAnalysis.tests.datafiles.PRM_NCBOX` and
:data:`~MDAnalysis.tests.datafiles.TRJ_NCBOX` from the MDAnalysis test suite.
To get started, execute ::
# imports
>>> import MDAnalysis as mda
>>> from transport_analysis.velocityautocorr import VelocityAutocorr
# test data for this example
>>> from MDAnalysis.tests.datafiles import PRM_NCBOX, TRJ_NCBOX
We will calculate the VACF of an atom group of all water atoms in
residues 1-5. To select these atoms:
>>> u = mda.Universe(PRM_NCBOX, TRJ_NCBOX)
>>> ag = u.select_atoms("resname WAT and resid 1-5")
We can run the calculation using any variable of choice such as
``wat_vacf`` and access our results using ``wat_vacf.results.timeseries``:
>>> wat_vacf = VelocityAutocorr(ag).run()
>>> wat_vacf.results.timeseries
array([275.62075467, -18.42008255, -23.94383428, 41.41415381,
-2.3164344 , -35.66393559, -22.66874897, -3.97575003,
6.57888933, -5.29065096])
Notice that this example data is insufficient to provide a well-defined VACF.
When working with real data, ensure that the frames are captured frequently
enough to obtain a VACF suitable for your needs.
"""
from typing import TYPE_CHECKING
from MDAnalysis.analysis.base import AnalysisBase
from MDAnalysis.core.groups import UpdatingAtomGroup
from MDAnalysis.exceptions import NoDataError
import numpy as np
import tidynamics
import matplotlib.pyplot as plt
from scipy import integrate
from transport_analysis.due import due, Doi
if TYPE_CHECKING:
from MDAnalysis.core.universe import AtomGroup
due.cite(
Doi("10.21105/joss.00877"),
description="Autocorrelation with tidynamics",
path="transport_analysis.velocityautocorr",
cite_module=True,
)
[docs]class VelocityAutocorr(AnalysisBase):
"""
Class to calculate a velocity autocorrelation function (VACF).
Parameters
----------
atomgroup : AtomGroup
An MDAnalysis :class:`~MDAnalysis.core.groups.AtomGroup`.
Note that :class:`UpdatingAtomGroup` instances are not accepted.
dim_type : {'xyz', 'xy', 'yz', 'xz', 'x', 'y', 'z'}
Desired dimensions to be included in the VACF. Defaults to 'xyz'.
fft : bool
If ``True``, uses a fast FFT based algorithm for computation of
the VACF. Otherwise, use the simple "windowed" algorithm.
The tidynamics package is required for `fft=True`.
Defaults to ``True``.
Attributes
----------
atomgroup : :class:`~MDAnalysis.core.groups.AtomGroup`
The atoms to which this analysis is applied
dim_fac : int
Dimensionality :math:`d` of the VACF.
results.timeseries : :class:`numpy.ndarray`
The averaged VACF over all the particles with respect to lag-time.
Obtained after calling :meth:`VelocityAutocorr.run`
results.vacf_by_particle : :class:`numpy.ndarray`
The VACF of each individual particle with respect to lag-time.
start : Optional[int]
The first frame of the trajectory used to compute the analysis.
stop : Optional[int]
The frame to stop at for the analysis, non-inclusive.
step : Optional[int]
Number of frames to skip between each analyzed frame.
n_frames : int
Number of frames analysed in the trajectory.
n_particles : int
Number of particles VACF was calculated over.
"""
def __init__(
self,
atomgroup: "AtomGroup",
dim_type: str = "xyz",
fft: bool = True,
**kwargs,
) -> None:
# the below line must be kept to initialize the AnalysisBase class!
super().__init__(atomgroup.universe.trajectory, **kwargs)
# after this you will be able to access `self.results`
# `self.results` is a dictionary-like object
# that can should used to store and retrieve results
# See more at the MDAnalysis documentation:
# https://docs.mdanalysis.org/stable/documentation_pages/analysis/base.html?highlight=results#MDAnalysis.analysis.base.Results
if isinstance(atomgroup, UpdatingAtomGroup):
raise TypeError(
"UpdatingAtomGroups are not valid for VACF computation"
)
# args
self.dim_type = dim_type.lower()
self._dim, self.dim_fac = self._parse_dim_type(self.dim_type)
self.fft = fft
# local
self.atomgroup = atomgroup
self.n_particles = len(self.atomgroup)
self._run_called = False
def _prepare(self):
"""Set up velocity and VACF arrays before the analysis loop begins"""
# 2D array of frames x particles
self.results.vacf_by_particle = np.zeros(
(self.n_frames, self.n_particles)
)
# 3D array of frames x particles x dimensionality
self._velocities = np.zeros(
(self.n_frames, self.n_particles, self.dim_fac)
)
# self.results.timeseries not set here
@staticmethod
def _parse_dim_type(dim_str):
"""Sets up the desired dimensionality of the VACF."""
keys = {
"x": [0],
"y": [1],
"z": [2],
"xy": [0, 1],
"xz": [0, 2],
"yz": [1, 2],
"xyz": [0, 1, 2],
}
try:
_dim = keys[dim_str]
except KeyError:
raise ValueError(
"invalid dim_type: {} specified, please specify one of xyz, "
"xy, xz, yz, x, y, z".format(dim_str)
)
return _dim, len(_dim)
def _single_frame(self):
"""Constructs array of velocities for VACF calculation."""
# This runs once for each frame of the trajectory
# The trajectory positions update automatically
# You can access the frame number using self._frame_index
# trajectory must have velocity information
if not self._ts.has_velocities:
raise NoDataError(
"VACF computation requires velocities in the trajectory"
)
# set shape of velocity array
self._velocities[self._frame_index] = self.atomgroup.velocities[
:, self._dim
]
# Results will be in units of (angstroms / ps)^2
def _conclude(self):
"""Calculate the final results of the analysis"""
# This is an optional method that runs after
# _single_frame loops over the trajectory.
# It is useful for calculating the final results
# of the analysis.
if self.fft:
self._conclude_fft()
else:
self._conclude_simple()
def _conclude_fft(self): # with FFT, np.float64 bit prescision required.
r"""Calculates the VACF via the FCA fast correlation algorithm."""
for n in range(self.n_particles):
self.results.vacf_by_particle[:, n] = tidynamics.acf(
self._velocities[:, n, :]
)
self.results.timeseries = self.results.vacf_by_particle.mean(axis=1)
self._run_called = True
def _conclude_simple(self):
r"""Calculates the VACF via the simple "windowed" algorithm."""
# total frames in trajectory, use N for readability
N = self.n_frames
# iterate through all possible lagtimes up to N
for lag in range(N):
# get product of velocities shifted by "lag" frames
veloc = (
self._velocities[: N - lag, :, :]
* self._velocities[lag:, :, :]
)
# dot product of x(, y, z) velocities per particle
sum_veloc = np.sum(veloc, axis=-1)
# average over # frames
# update VACF by particle array
self.results.vacf_by_particle[lag, :] = np.mean(sum_veloc, axis=0)
# average over # particles and update results array
self.results.timeseries = self.results.vacf_by_particle.mean(axis=1)
self._run_called = True
[docs] def plot_vacf(
self,
start=0,
stop=0,
step=1,
xlabel="Time (ps)",
ylabel="Velocity Autocorrelation Function (Å^2 / ps^2)",
):
"""
Returns a velocity autocorrelation function (VACF) plot via
``Matplotlib``. Analysis must be run prior to plotting.
Parameters
----------
start : Optional[int]
The first frame of ``self.results.timeseries``
used for the plot.
stop : Optional[int]
The frame of ``self.results.timeseries`` to stop at
for the plot, non-inclusive.
step : Optional[int]
Number of frames to skip between each plotted frame.
xlabel : Optional[str]
The x-axis label text. Defaults to "Time (ps)".
ylabel : Optional[str]
The y-axis label text.
Defaults to "Velocity Autocorrelation Function (Å^2 / ps^2)".
Returns
-------
:class:`matplotlib.lines.Line2D`
A :class:`matplotlib.lines.Line2D` instance with
the desired VACF plotting information.
"""
if not self._run_called:
raise RuntimeError("Analysis must be run prior to plotting")
stop = self.n_frames if stop == 0 else stop
fig, ax_vacf = plt.subplots()
ax_vacf.set_xlabel(xlabel)
ax_vacf.set_ylabel(ylabel)
return ax_vacf.plot(
self.times[start:stop:step],
self.results.timeseries[start:stop:step],
)
[docs] def self_diffusivity_gk(self, start=0, stop=0, step=1):
"""
Returns a self-diffusivity value using ``scipy.integrate.trapezoid``.
Analysis must be run prior to computing self-diffusivity.
Parameters
----------
start : Optional[int]
The first frame of ``self.results.timeseries``
used for the calculation.
stop : Optional[int]
The frame of ``self.results.timeseries`` to stop at
for the calculation, non-inclusive.
step : Optional[int]
Number of frames to skip between each frame used
for the calculation.
Returns
-------
`numpy.float64`
The calculated self-diffusivity value for the analysis.
"""
if not self._run_called:
raise RuntimeError(
"Analysis must be run prior to computing self-diffusivity"
)
stop = self.n_frames if stop == 0 else stop
return (
integrate.trapezoid(
self.results.timeseries[start:stop:step],
self.times[start:stop:step],
)
/ self.dim_fac
)
[docs] def self_diffusivity_gk_odd(self, start=0, stop=0, step=1):
"""
Returns a self-diffusivity value using ``scipy.integrate.simpson``.
Recommended for use with an odd number of evenly spaced data points.
Analysis must be run prior to computing self-diffusivity.
Parameters
----------
start : Optional[int]
The first frame of ``self.results.timeseries``
used for the calculation.
stop : Optional[int]
The frame of ``self.results.timeseries`` to stop at
for the calculation, non-inclusive.
step : Optional[int]
Number of frames to skip between each frame used
for the calculation.
Returns
-------
`numpy.float64`
The calculated self-diffusivity value for the analysis.
"""
if not self._run_called:
raise RuntimeError(
"Analysis must be run prior to computing self-diffusivity"
)
stop = self.n_frames if stop == 0 else stop
return (
integrate.simpson(
y=self.results.timeseries[start:stop:step],
x=self.times[start:stop:step],
)
/ self.dim_fac
)
[docs] def plot_running_integral(
self,
start=0,
stop=0,
step=1,
initial=0,
xlabel="Time (ps)",
ylabel="Running Integral of the VACF (Å^2 / ps)",
):
"""
Returns a plot of the running integral of the
velocity autocorrelation function (VACF) via ``Matplotlib``.
In this case, the running integral is the integral of the VACF
divided by the dimensionality. Analysis must be run prior to plotting.
Parameters
----------
start : Optional[int]
The first frame of ``self.results.timeseries``
used for the plot.
stop : Optional[int]
The frame of ``self.results.timeseries`` to stop at
for the plot, non-inclusive.
step : Optional[int]
Number of frames to skip between each plotted frame.
initial : Optional[float]
Inserted value at the beginning of the integrated result array.
Defaults to 0.
xlabel : Optional[str]
The x-axis label text. Defaults to "Time (ps)".
ylabel : Optional[str]
The y-axis label text.
Defaults to "Running Integral of the VACF (Å^2 / ps)".
Returns
-------
:class:`matplotlib.lines.Line2D`
A :class:`matplotlib.lines.Line2D` instance with
the desired VACF plotting information.
"""
if not self._run_called:
raise RuntimeError("Analysis must be run prior to plotting")
stop = self.n_frames if stop == 0 else stop
running_integral = (
integrate.cumulative_trapezoid(
self.results.timeseries[start:stop:step],
self.times[start:stop:step],
initial=initial,
)
/ self.dim_fac
)
fig, ax_running_integral = plt.subplots()
ax_running_integral.set_xlabel(xlabel)
ax_running_integral.set_ylabel(ylabel)
return ax_running_integral.plot(
self.times[start:stop:step],
running_integral,
)