# This file is part of MSMBuilder.
#
# Copyright 2011 Stanford University
#
# MSMBuilder is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
"""
Functions for performing Transition Path Theory calculations.
Written and maintained by TJ Lane <tjlane@stanford.edu>
Contributions from Kyle Beauchamp, Robert McGibbon, Vince Voelz
To Do:
-- Get stuff passing unit tests
"""
import numpy as np
import scipy.sparse
from msmbuilder import MSMLib
from msmbuilder import msm_analysis
from msmbuilder.utils import deprecated
import logging
logger = logging.getLogger(__name__)
# turn on debugging printout
# logger.setLogLevel(logging.DEBUG)
###############################################################################
# Typechecking/Utility Functions
#
def _ensure_iterable(arg):
if not hasattr(arg, '__iter__'):
arg = list([int(arg)])
logger.debug("Passed object was not iterable,"
" converted it to: %s" % str(arg))
assert hasattr(arg, '__iter__')
return arg
def _check_sources_sinks(sources, sinks):
sources = _ensure_iterable(sources)
sinks = _ensure_iterable(sinks)
if np.any(sources == sinks):
raise ValueError("Sets `sources` and `sinks` must be disjoint "
"to find paths between them")
return sources, sinks
###############################################################################
# Path Finding Functions
#
[docs]def find_top_paths(sources, sinks, tprob, num_paths=10, node_wipe=False, net_flux=None):
r"""
Calls the Dijkstra algorithm to find the top 'NumPaths'.
Does this recursively by first finding the top flux path, then cutting that
path and relaxing to find the second top path. Continues until NumPaths
have been found.
Parameters
----------
sources : array_like, int
The indices of the source states
sinks : array_like, int
Indices of sink states
num_paths : int
The number of paths to find
Returns
-------
Paths : list of lists
The nodes transversed in each path
Bottlenecks : list of tuples
The nodes between which exists the path bottleneck
Fluxes : list of floats
The flux through each path
Optional Parameters
-------------------
node_wipe : bool
If true, removes the bottleneck-generating node from the graph, instead
of just the bottleneck (not recommended, a debugging functionality)
net_flux : sparse matrix
Matrix of the net flux from `sources` to `sinks`, see function `net_flux`.
If not provided, is calculated from scratch. If provided, `tprob` is
ignored.
To Do
-----
-- Add periodic flow check
"""
# first, do some checking on the input, esp. `sources` and `sinks`
# we want to make sure all objects are iterable and the sets are disjoint
sources, sinks = _check_sources_sinks(sources, sinks)
msm_analysis.check_transition(tprob)
# check to see if we get net_flux for free, otherwise calculate it
if not net_flux:
net_flux = calculate_net_fluxes(sources, sinks, tprob)
# initialize objects
paths = []
fluxes = []
bottlenecks = []
if scipy.sparse.issparse(net_flux):
net_flux = net_flux.tolil()
# run the initial Dijkstra pass
pi, b = Dijkstra(sources, sinks, net_flux)
logger.info("Path Num | Path | Bottleneck | Flux")
i = 1
done = False
while not done:
# First find the highest flux pathway
(path, (b1, b2), flux) = _backtrack(sinks, b, pi, net_flux)
# Add each result to a Paths, Bottlenecks, Fluxes list
if flux == 0:
logger.info("Only %d possible pathways found. Stopping backtrack.", i)
break
paths.append(path)
bottlenecks.append((b1, b2))
fluxes.append(flux)
logger.info("%s | %s | %s | %s ", i, path, (b1, b2), flux)
# Cut the bottleneck, start relaxing from B side of the cut
if node_wipe:
net_flux[:, b2] = 0
logger.info("Wiped node: %s", b2)
else:
net_flux[b1, b2] = 0
G = scipy.sparse.find(net_flux)
Q = [b2]
b, pi, net_flux = _back_relax(b2, b, pi, net_flux)
# Then relax the graph and repeat
# But only if we still need to
if i != num_paths - 1:
while len(Q) > 0:
w = Q.pop()
for v in G[1][np.where(G[0] == w)]:
if pi[v] == w:
b, pi, net_flux = _back_relax(v, b, pi, net_flux)
Q.append(v)
Q = sorted(Q, key=lambda v: b[v])
i += 1
if i == num_paths + 1:
done = True
if flux == 0:
logger.info("Only %d possible pathways found. Stopping backtrack.", i)
done = True
return paths, bottlenecks, fluxes
[docs]def Dijkstra(sources, sinks, net_flux):
r""" A modified Dijkstra algorithm that dynamically computes the cost
of all paths from A to B, weighted by NFlux.
Parameters
----------
sources : array_like, int
The indices of the source states (i.e. for state A in rxn A -> B)
sinks : array_like, int
Indices of sink states (state B)
NFlux : sparse matrix
Matrix of the net flux from A to B, see function GetFlux
Returns
-------
pi : array_like
The paths from A->B, pi[i] = node preceeding i
b : array_like
The flux passing through each node
See Also
--------
DijkstraTopPaths : child function
`DijkstraTopPaths` is probably the function you want to call to find
paths through an MSM network. This is a utility function called by
`DijkstraTopPaths`, but may be useful in some specific cases
"""
sources, sinks = _check_sources_sinks(sources, sinks)
# initialize data structures
if scipy.sparse.issparse(net_flux):
net_flux = net_flux.tolil()
else:
net_flux = scipy.sparse.lil_matrix(net_flux)
G = scipy.sparse.find(net_flux)
N = net_flux.shape[0]
b = np.zeros(N)
b[sources] = 1000
pi = np.zeros(N, dtype=int)
pi[sources] = -1
U = []
Q = sorted(range(N), key=lambda v: b[v])
for v in sinks:
Q.remove(v)
# run the Dijkstra algorithm
while len(Q) > 0:
w = Q.pop()
U.append(w)
# relax
for v in G[1][np.where(G[0] == w)]:
if b[v] < min(b[w], net_flux[w, v]):
b[v] = min(b[w], net_flux[w, v])
pi[v] = w
Q = sorted(Q, key=lambda v: b[v])
logger.info("Searched %s nodes", len(U) + len(sinks))
return pi, b
def _back_relax(s, b, pi, NFlux):
r"""
Updates a Djikstra calculation once a bottleneck is cut, quickly
recalculating only cost of nodes that change due to the cut.
Cuts & relaxes the B-side (sink side) of a cut edge (b2) to source from the
adjacent node with the most flux flowing to it. If there are no
adjacent source nodes, cuts the node out of the graph and relaxes the
nodes that were getting fed by b2 (the cut node).
Parameters
----------
s : int
the node b2
b : array_like
the cost function
pi : array_like
the backtrack array, a list such that pi[i] = source node of node i
NFlux : sparse matrix
Net flux matrix
Returns
-------
b : array_like
updated cost function
pi : array_like
updated backtrack array
NFlux : sparse matrix
net flux matrix
See Also
--------
DijkstraTopPaths : child function
`DijkstraTopPaths` is probably the function you want to call to find
paths through an MSM network. This is a utility function called by
`DijkstraTopPaths`, but may be useful in some specific cases
"""
G = scipy.sparse.find(NFlux)
if len(G[0][np.where(G[1] == s)]) > 0:
# For all nodes connected upstream to the node `s` in question,
# Re-source that node from the best option (lowest cost) one level lower
# Notation: j is node one level below, s is the one being considered
b[s] = 0 # set the cost to zero
for j in G[0][np.where(G[1] == s)]: # for each upstream node
if b[s] < min(b[j], NFlux[j, s]): # if that node has a lower cost
b[s] = min(b[j], NFlux[j, s]) # then set the cost to that node
pi[s] = j # and the source comes from there
# if there are no nodes connected to this one, then we need to go one
# level up and work there first
else:
for sprime in G[1][np.where(G[0] == s)]:
NFlux[s, sprime] = 0
b, pi, NFlux = _back_relax(sprime, b, pi, NFlux)
return b, pi, NFlux
def _backtrack(B, b, pi, NFlux):
"""
Works backwards to pull out a path from pi, where pi is a list such that
pi[i] = source node of node i. Begins at the largest staring incoming flux
point in B.
Parameters
----------
B : array_like, int
Indices of sink states (state B)
b : array_like
the cost function
pi : array_like
the backtrack array, a list such that pi[i] = source node of node i
NFlux : sparse matrix
net flux matrix
Returns
-------
bestpath : list
the list of nodes forming the highest flux path
bottleneck : tuple
a tupe of nodes, between which is the bottleneck
bestflux : float
the flux through `bestpath`
See Also
--------
DijkstraTopPaths : child function
`DijkstraTopPaths` is probably the function you want to call to find
paths through an MSM network. This is a utility function called by
`DijkstraTopPaths`, but may be useful in some specific cases
"""
# Select starting location
bestflux = 0
for Bnode in B:
path = [Bnode]
NotDone = True
while NotDone:
if pi[path[-1]] == -1:
break
else:
path.append(pi[path[-1]])
path.reverse()
bottleneck, flux = find_path_bottleneck(path, NFlux)
logger.debug('In Backtrack: Flux %s, bestflux %s', flux, bestflux)
if flux > bestflux:
bestpath = path
bestbottleneck = bottleneck
bestflux = flux
if flux == 0:
bestpath = []
bottleneck = (np.nan, np.nan)
bestflux = 0
return (bestpath, bestbottleneck, bestflux)
[docs]def find_path_bottleneck(path, net_flux):
"""
Simply finds the bottleneck along a path.
This is the point at which the cost function first goes up along the path,
backtracking from B to A.
Parameters
----------
path : list
a list of nodes along the path of interest
net_flux : matrix
the net flux matrix
Returns
-------
bottleneck : tuple
a tuple of the nodes on either end of the bottleneck
flux : float
the flux at the bottleneck
See Also
--------
find_top_paths : child function
`find_top_paths` is probably the function you want to call to find
paths through an MSM network. This is a utility function called by
`find_top_paths`, but may be useful in some specific cases
"""
if scipy.sparse.issparse(net_flux):
net_flux = net_flux.tolil()
flux = 100000. # initialize as large value
for i in range(len(path) - 1):
if net_flux[path[i], path[i + 1]] < flux:
flux = net_flux[path[i], path[i + 1]]
b1 = path[i]
b2 = path[i + 1]
return (b1, b2), flux
[docs]def calculate_fluxes(sources, sinks, tprob, populations=None, committors=None):
"""
Compute the transition path theory flux matrix.
Parameters
----------
sources : array_like, int
The set of unfolded/reactant states.
sinks : array_like, int
The set of folded/product states.
tprob : mm_matrix
The transition matrix.
Returns
------
fluxes : mm_matrix
The flux matrix
Optional Parameters
-------------------
populations : nd_array, float
The equilibrium populations, if not provided is re-calculated
committors : nd_array, float
The committors associated with `sources`, `sinks`, and `tprob`.
If not provided, is calculated from scratch. If provided, `sources`
and `sinks` are ignored.
"""
sources, sinks = _check_sources_sinks(sources, sinks)
msm_analysis.check_transition(tprob)
if scipy.sparse.issparse(tprob):
dense = False
else:
dense = True
# check if we got the populations
if populations is None:
eigens = msm_analysis.get_eigenvectors(tprob, 1)
if np.count_nonzero(np.imag(eigens[1][:, 0])) != 0:
raise ValueError('First eigenvector has imaginary components')
populations = np.real(eigens[1][:, 0])
# check if we got the committors
if committors is None:
committors = calculate_committors(sources, sinks, tprob)
# perform the flux computation
Indx, Indy = tprob.nonzero()
n = tprob.shape[0]
if dense:
X = np.zeros((n, n))
Y = np.zeros((n, n))
X[(np.arange(n), np.arange(n))] = populations * (1.0 - committors)
Y[(np.arange(n), np.arange(n))] = committors
else:
X = scipy.sparse.lil_matrix((n, n))
Y = scipy.sparse.lil_matrix((n, n))
X.setdiag(populations * (1.0 - committors))
Y.setdiag(committors)
if dense:
fluxes = np.dot(np.dot(X, tprob), Y)
fluxes[(np.arange(n), np.arange(n))] = np.zeros(n)
else:
fluxes = (X.tocsr().dot(tprob.tocsr())).dot(Y.tocsr())
# This should be the same as below, but it's a bit messy...
#fluxes = np.dot(np.dot(X.tocsr(), tprob.tocsr()), Y.tocsr())
fluxes = fluxes.tolil()
fluxes.setdiag(np.zeros(n))
return fluxes
[docs]def calculate_net_fluxes(sources, sinks, tprob, populations=None, committors=None):
"""
Computes the transition path theory net flux matrix.
Parameters
----------
sources : array_like, int
The set of unfolded/reactant states.
sinks : array_like, int
The set of folded/product states.
tprob : mm_matrix
The transition matrix.
Returns
------
net_fluxes : mm_matrix
The net flux matrix
Optional Parameters
-------------------
populations : nd_array, float
The equilibrium populations, if not provided is re-calculated
committors : nd_array, float
The committors associated with `sources`, `sinks`, and `tprob`.
If not provided, is calculated from scratch. If provided, `sources`
and `sinks` are ignored.
"""
sources, sinks = _check_sources_sinks(sources, sinks)
msm_analysis.check_transition(tprob)
if scipy.sparse.issparse(tprob):
dense = False
else:
dense = True
n = tprob.shape[0]
flux = calculate_fluxes(sources, sinks, tprob, populations, committors)
ind = flux.nonzero()
if dense:
net_flux = np.zeros((n, n))
else:
net_flux = scipy.sparse.lil_matrix((n, n))
for k in range(len(ind[0])):
i, j = ind[0][k], ind[1][k]
forward = flux[i, j]
reverse = flux[j, i]
net_flux[i, j] = max(0, forward - reverse)
return net_flux
###############################################################################
# MFPT & Committor Finding Functions
#
[docs]def calculate_ensemble_mfpt(sources, sinks, tprob, lag_time):
"""
Calculates the average 'Folding Time' of an MSM defined by T and a LagTime.
The Folding Time is the average of the MFPTs (to F) of all the states in U.
Note here 'Folding Time' is defined as the avg MFPT of {U}, to {F}.
Consider this carefully. This is probably NOT the experimental folding time!
Parameters
----------
sources : array, int
indices of the source states
sinks : array, int
indices of the sink states
tprob : matrix
transition probability matrix
lag_time : float
the lag time used to create T (dictates units of the answer)
Returns
-------
avg : float
the average of the MFPTs
std : float
the standard deviation of the MFPTs
"""
sources, sinks = _check_sources_sinks(sources, sinks)
msm_analysis.check_transition(tprob)
X = calculate_mfpt(sinks, tprob, lag_time)
times = np.zeros(len(sources))
for i in range(len(sources)):
times[i] = X[sources[i]]
return np.average(times), np.std(times)
[docs]def calculate_avg_TP_time(sources, sinks, tprob, lag_time):
"""
Calculates the Average Transition Path Time for MSM with: T, LagTime.
The TPTime is the average of the MFPTs (to F) of all the states
immediately adjacent to U, with the U states effectively deleted.
Note here 'TP Time' is defined as the avg MFPT of all adjacent states to {U},
to {F}, ignoring {U}.
Consider this carefully.
Parameters
----------
sources : array, int
indices of the unfolded states
sinks : array, int
indices of the folded states
tprob : matrix
transition probability matrix
lag_time : float
the lag time used to create T (dictates units of the answer)
Returns
-------
avg : float
the average of the MFPTs
std : float
the standard deviation of the MFPTs
"""
sources, sinks = _check_sources_sinks(sources, sinks)
msm_analysis.check_transition(tprob)
n = tprob.shape[0]
if scipy.sparse.issparse(tprob):
T = tprob.tolil()
P = scipy.sparse.lil_matrix((n, n))
else:
P = np.zeros((n, n))
for u in sources:
for i in range(n):
if i not in sources:
P[u, i] = T[u, i]
for u in sources:
T[u, :] = np.zeros(n)
T[:, u] = 0
for i in sources:
N = T[i, :].sum()
T[i, :] = T[i, :] / N
X = calculate_mfpt(sinks, tprob, lag_time)
TP = P * X.T
TPtimes = []
for time in TP:
if time != 0:
TPtimes.append(time)
return np.average(TPtimes), np.std(TPtimes)
[docs]def calculate_mfpt(sinks, tprob, lag_time=1.):
"""
Gets the Mean First Passage Time (MFPT) for all states to a *set*
of sinks.
Parameters
----------
sinks : array, int
indices of the sink states
tprob : matrix
transition probability matrix
LagTime : float
the lag time used to create T (dictates units of the answer)
Returns
-------
MFPT : array, float
MFPT in time units of LagTime, for each state (in order of state index)
See Also
--------
calculate_all_to_all_mfpt : function
A more efficient way to calculate all the MFPTs in a network
"""
sinks = _ensure_iterable(sinks)
msm_analysis.check_transition(tprob)
n = tprob.shape[0]
if scipy.sparse.isspmatrix(tprob):
tprob = tprob.tolil()
for state in sinks:
tprob[state, :] = 0.0
tprob[state, state] = 2.0
if scipy.sparse.isspmatrix(tprob):
tprob = tprob - scipy.sparse.eye(n, n)
tprob = tprob.tocsr()
else:
tprob = tprob - np.eye(n)
RHS = -1 * np.ones(n)
for state in sinks:
RHS[state] = 0.0
if scipy.sparse.isspmatrix(tprob):
MFPT = lag_time * scipy.sparse.linalg.spsolve(tprob, RHS)
else:
MFPT = lag_time * np.linalg.solve(tprob, RHS)
return MFPT
[docs]def calculate_all_to_all_mfpt(tprob, populations=None):
"""
Calculate the all-states by all-state matrix of mean first passage
times.
This uses the fundamental matrix formalism, and should be much faster
than GetMFPT for calculating many MFPTs.
Parameters
----------
tprob : matrix
transition probability matrix
populations : array_like, float
optional argument, the populations of each state. If not supplied,
it will be computed from scratch
Returns
-------
MFPT : array, float
MFPT in time units of LagTime, square array for MFPT from i -> j
See Also
--------
GetMFPT : function
for calculating a subset of the MFPTs, with functionality for including
a set of sinks
"""
msm_analysis.check_transition(tprob)
if scipy.sparse.issparse(tprob):
tprob = tprob.toarray()
logger.warning('calculate_all_to_all_mfpt does not support sparse linear algebra')
if populations is None:
eigens = msm_analysis.get_eigenvectors(tprob, 1)
if np.count_nonzero(np.imag(eigens[1][:, 0])) != 0:
raise ValueError('First eigenvector has imaginary parts')
populations = np.real(eigens[1][:, 0])
# ensure that tprob is a transition matrix
msm_analysis.check_transition(tprob)
num_states = len(populations)
if tprob.shape[0] != num_states:
raise ValueError("Shape of tprob and populations vector don't match")
eye = np.transpose(np.matrix(np.ones(num_states)))
limiting_matrix = eye * populations
#z = scipy.linalg.inv(scipy.sparse.eye(num_states, num_states) - (tprob - limiting_matrix))
z = scipy.linalg.inv(np.eye(num_states) - (tprob - limiting_matrix))
# mfpt[i,j] = z[j,j] - z[i,j] / pi[j]
mfpt = -z
for j in range(num_states):
mfpt[:, j] += z[j, j]
mfpt[:, j] /= populations[j]
return mfpt
[docs]def calculate_committors(sources, sinks, tprob):
"""
Get the forward committors of the reaction sources -> sinks.
Parameters
----------
sources : array_like, int
The set of unfolded/reactant states.
sinks : array_like, int
The set of folded/product states.
tprob : mm_matrix
The transition matrix.
Returns
-------
Q : array_like
The forward committors for the reaction U -> F.
"""
sources, sinks = _check_sources_sinks(sources, sinks)
msm_analysis.check_transition(tprob)
if scipy.sparse.issparse(tprob):
dense = False
tprob = tprob.tolil()
else:
dense = True
# construct the committor problem
n = tprob.shape[0]
if dense:
T = np.eye(n) - tprob
else:
T = scipy.sparse.eye(n, n, 0, format='lil') - tprob
T = T.tolil()
for a in sources:
T[a, :] = 0.0 # np.zeros(n)
T[:, a] = 0.0
T[a, a] = 1.0
for b in sinks:
T[b, :] = 0.0 # np.zeros(n)
T[:, b] = 0.0
T[b, b] = 1.0
IdB = np.zeros(n)
IdB[sinks] = 1.0
if dense:
RHS = np.dot(tprob, IdB)
else:
RHS = tprob.dot(IdB)
# This should be the same as below
#RHS = tprob * IdB
RHS[sources] = 0.0
RHS[sinks] = 1.0
# solve for the committors
if dense == False:
Q = scipy.sparse.linalg.spsolve(T.tocsr(), RHS)
else:
Q = np.linalg.solve(T, RHS)
epsilon = 0.001
assert np.all(Q <= 1.0 + epsilon)
assert np.all(Q >= 0.0 - epsilon)
return Q
######################################################################
# Functions for computing hub scores, conditional committors, and
# related quantities
#
[docs]def calculate_fraction_visits(tprob, waypoint, source, sink, return_cond_Q=False):
"""
Calculate the fraction of times a walker on `tprob` going from `sources`
to `sinks` will travel through the set of states `waypoints` en route.
Computes the conditional committors q^{ABC^+} and uses them to find the
fraction of paths mentioned above. The conditional committors can be
Note that in the notation of Dickson et. al. this computes h_c(A,B), with
sources = A
sinks = B
waypoint = C
Parameters
----------
tprob : matrix
The transition probability matrix
waypoint : int
The index of the intermediate state
sources : nd_array, int or int
The indices of the source state(s)
sinks : nd_array, int or int
The indices of the sink state(s)
return_cond_Q : bool
Whether or not to return the conditional committors
Returns
-------
fraction_paths : float
The fraction of times a walker going from `sources` -> `sinks` stops
by `waypoints` on its way.
cond_Q : nd_array, float (optional)
Optionally returned (`return_cond_Q`)
See Also
--------
calculate_hub_score : function
Compute the 'hub score', the weighted fraction of visits for an
entire network.
calculate_all_hub_scores : function
Wrapper to compute all the hub scores in a network.
Notes
-----
Employs dense linear algebra,
memory use scales as N^2
cycle use scales as N^3
References
----------
..[1] Dickson & Brooks (2012), J. Chem. Theory Comput.,
Article ASAP DOI: 10.1021/ct300537s
"""
# do some typechecking - we need to be sure that the lumped sources are in
# the second to last row, and the lumped sinks are in the last row
# check `tprob`
msm_analysis.check_transition(tprob)
if type(tprob) != np.ndarray:
try:
tprob = tprob.todense()
except AttributeError as e:
raise TypeError('Argument `tprob` must be convertable to a dense'
'numpy array. \n%s' % e)
# typecheck
for data in [source, sink, waypoint]:
if type(data) == int:
pass
elif hasattr(data, 'len'):
if len(data) == 1:
data = data[0]
else:
raise TypeError('Arguments source/sink/waypoint must be an int')
if (source == waypoint) or (sink == waypoint) or (sink == source):
raise ValueError('source, sink, waypoint must all be disjoint!')
N = tprob.shape[0]
Q = calculate_committors([source], [sink], tprob)
# permute the transition matrix into cannonical form - send waypoint the the
# last row, and source + sink to the end after that
Bsink_indices = [source, sink, waypoint]
perm = np.arange(N)
perm = np.delete(perm, Bsink_indices)
perm = np.append(perm, Bsink_indices)
T = MSMLib.permute_mat(tprob, perm)
# extract P, R
n = N - len(Bsink_indices)
P = T[:n, :n]
R = T[:n, n:]
# calculate the conditional committors ( B = N*R ), B[i,j] is the prob
# state i ends in j, where j runs over the source + sink + waypoint
# (waypoint is position -1)
B = np.dot(np.linalg.inv(np.eye(n) - P), R)
# Not sure if this is sparse or not...
# add probs for the sinks, waypoint / b[i] is P( i --> {C & not A, B} )
b = np.append(B[:, -1].flatten(), [0.0] * (len(Bsink_indices) - 1) + [1.0])
cond_Q = b * Q[waypoint]
epsilon = 1e-6 # some numerical give, hard-coded
assert cond_Q.shape == (N,)
assert np.all(cond_Q <= 1.0 + epsilon)
assert np.all(cond_Q >= 0.0 - epsilon)
assert np.all(cond_Q <= Q[perm] + epsilon)
# finally, calculate the fraction of paths h_C(A,B) (eq. 7 in [1])
fraction_paths = np.sum(T[-3, :] * cond_Q) / np.sum(T[-3, :] * Q[perm])
assert fraction_paths <= 1.0
assert fraction_paths >= 0.0
if return_cond_Q:
cond_Q = cond_Q[np.argsort(perm)] # put back in orig. order
return fraction_paths, cond_Q
else:
return fraction_paths
[docs]def calculate_hub_score(tprob, waypoint):
"""
Calculate the hub score for the states `waypoint`.
The "hub score" is a measure of how well traveled a certain state or
set of states is in a network. Specifically, it is the fraction of
times that a walker visits a state en route from some state A to another
state B, averaged over all combinations of A and B.
Parameters
----------
tprob : matrix
The transition probability matrix
waypoints : int
The indices of the intermediate state(s)
Returns
-------
Hc : float
The hub score for the state composed of `waypoints`
See Also
--------
calculate_fraction_visits : function
Calculate the fraction of times a state is visited on pathways going
from a set of "sources" to a set of "sinks".
calculate_all_hub_scores : function
A more efficient way to compute the hub score for every state in a
network.
Notes
-----
Employs dense linear algebra,
memory use scales as N^2
cycle use scales as N^5
References
----------
..[1] Dickson & Brooks (2012), J. Chem. Theory Comput.,
Article ASAP DOI: 10.1021/ct300537s
"""
msm_analysis.check_transition(tprob)
# typecheck
if type(waypoint) != int:
if hasattr(waypoint, '__len__'):
if len(waypoint) == 1:
waypoint = waypoint[0]
else:
raise ValueError('Must pass waypoints as int or list/array of ints')
else:
raise ValueError('Must pass waypoints as int or list/array of ints')
# find out which states to include in A, B (i.e. everything but C)
N = tprob.shape[0]
states_to_include = range(N)
states_to_include.remove(waypoint)
# calculate the hub score
Hc = 0.0
for s1 in states_to_include:
for s2 in states_to_include:
if (s1 != s2) and (s1 != waypoint) and (s2 != waypoint):
Hc += calculate_fraction_visits(tprob, waypoint,
s1, s2, return_cond_Q=False)
Hc /= ((N - 1) * (N - 2))
return Hc
[docs]def calculate_all_hub_scores(tprob):
"""
Calculate the hub scores for all states in a network defined by `tprob`.
The "hub score" is a measure of how well traveled a certain state or
set of states is in a network. Specifically, it is the fraction of
times that a walker visits a state en route from some state A to another
state B, averaged over all combinations of A and B.
Parameters
----------
tprob : matrix
The transition probability matrix
Returns
-------
Hc_array : nd_array, float
The hub score for each state in `tprob`
See Also
--------
calculate_fraction_visits : function
Calculate the fraction of times a state is visited on pathways going
from a set of "sources" to a set of "sinks".
calculate_hub_score : function
A function that computes just one hub score, can compute the hub score
for a set of states.
Notes
-----
Employs dense linear algebra,
memory use scales as N^2
cycle use scales as N^6
References
----------
..[1] Dickson & Brooks (2012), J. Chem. Theory Comput.,
Article ASAP DOI: 10.1021/ct300537s
"""
N = tprob.shape[0]
states = range(N)
# calculate the hub score
Hc_array = np.zeros(N)
# loop over each state and compute it's hub score
for i, waypoint in enumerate(states):
Hc = 0.0
# now loop over all combinations of sources/sinks and average
for s1 in states:
if waypoint != s1:
for s2 in states:
if s1 != s2:
if waypoint != s2:
Hc += calculate_fraction_visits(tprob, waypoint, s1, s2)
# store the hub score in an array
Hc_array[i] = Hc / ((N - 1) * (N - 2))
return Hc_array
######################################################################
# ALIASES FOR DEPRECATED FUNCTION NAMES
# THESE FUNCTIONS WERE ADDED FOR VERSION 2.6 AND
# CAN BE REMOVED IN VERSION 2.7
######################################################################
@deprecated(calculate_committors, '2.7')
def GetBCommittorsEqn(U, F, T0, EquilibriumPopulations):
pass
@deprecated(calculate_committors, '2.7')
def GetFCommittorsEqn(A, B, T0, dense=False):
pass
# TJL: We don't need a "get backwards committors", since they are just
# 1 minus the forward committors
@deprecated(calculate_committors, '2.7')
def GetBCommittors(U, F, T0, EquilibriumPopulations, X0=None, Dense=False):
pass
@deprecated(calculate_mfpt, '2.7')
def MFPT():
pass
@deprecated(calculate_committors, '2.7')
def GetFCommittors():
pass
@deprecated(calculate_fluxes, '2.7')
def GetFlux():
print "WARNING: The call signature for the new function has changed."
pass
@deprecated(calculate_net_fluxes, '2.7')
def GetNetFlux():
print "WARNING: The call signature for the new function has changed."
pass
@deprecated(calculate_avg_TP_time, '2.7')
def CalcAvgTPTime():
pass
@deprecated(calculate_ensemble_mfpt, '2.7')
def CalcAvgFoldingTime():
pass