Source code for fluxpy.utils.analysis


import numpy as np
import pandas as pd
import math
import networkx as nx
from .utils import NestedDataFrameType

[docs]def get_nutrients_gradient(model, nutrients=None, upper_bound=None, step=None) -> NestedDataFrameType: """ Assuming a single medium comound changes at a time, it returns FVA findings over the chaning envs. The model.medium will be used, i.e. first set the medium you want to investigate to your model. Returns a df with as many rows as the model.medium and columns as many as their quotient by the step. Each cell of this df includes the fva result for the medium with the corresponding alteration. For example, df.loc[0,0] corresponds to the medium where the first compound has the lowest value possible. model -- a cobra model nutrients -- list of reaction ids for which a gradient will be calculated upper_bound -- maximum a flux can get step -- increase of upper_bound in each iteration of the gradient """ from cobra.flux_analysis import flux_variability_analysis if upper_bound is None: upper_bound = max(model.medium.values()) if step is None: num_of_chunks = 5 step = math.floor(upper_bound / num_of_chunks) else: num_of_chunks = math.floor(upper_bound / step) modulo = (upper_bound % num_of_chunks) + 1 if nutrients is None: nutrients = model.medium.copy() num_of_medium_compounds = len(nutrients) else: num_of_medium_compounds = len(nutrients) # [NOTE] If a column is added at a later point using the pd.at() it's gonna be nan df = pd.DataFrame(index=range(num_of_medium_compounds), columns=range(num_of_chunks + 1)) initial_model = model.copy() x_coord = 0 for compound in nutrients: print(compound, str(x_coord) + "/" + str(num_of_medium_compounds)) model_in = initial_model.copy() j_coord = 0 for j in np.arange(0, upper_bound+modulo, step): if j > upper_bound: j = upper_bound limited_medium = model_in.medium limited_medium[compound] = j model_in.medium = limited_medium try: fva = flux_variability_analysis(model_in) except: print("Infeasible for ", compound, "with upper bound of ", str(j)) fva = pd.DataFrame(data=[[0]*len(model.reactions)] * 2).T fva.columns = ["minimum", "maximum"] fva.index = [x.id for x in model.reactions] df.at[x_coord, j_coord] = fva j_coord += 1 x_coord += 1 df.index = nutrients columns = [] for i in range(num_of_chunks + 1): columns.append("ub_" + str(i*step)) df.columns = columns return df
# %% Util functions: flux coupling analysis
[docs]def parse_qfca(qfca_output, model=None, remove_exchange_routes=True, exclude_biomass=True, format="csv"): """ Parses QFCA (Quantitative Fatty Acid Composition Analysis) output data into a networkx graph representation. Args: qfca_output (str): Path to the QFCA output file in CSV or Excel format. model (object, optional): Metabolic model object. Default is None. remove_exchange_routes (bool, optional): Whether to remove exchange reactions from the graph. Default is True. exclude_biomass (bool, optional): Whether to exclude biomass reaction from the graph. Default is True. format (str, optional): Format of the QFCA output file. Can be 'csv' or 'xlsx'. Default is 'csv'. Returns: nx.Graph: A networkx Graph representing the interactions between metabolic reactions based on QFCA data. """ if format=="csv": df = pd.read_csv(qfca_output, index_col=0) elif format=="xlsx": df = pd.read_excel(qfca_output, index_col=0) # Create a graphimport networkx as nx G = nx.Graph() # Add nodes G.add_nodes_from(df.index) # Check if model is provided and get biomass_reaction if possible biomass_reaction = None if model is not None and model.objective is not None: objective = str(model.summary()._objective) biomass_reaction = objective.split(" ")[1] """ 1 - fully coupled reactions - Black // Grey // dark green (#004D40) 2 - partially coupled reactions - Blue // SteelBlue // Cyan (#1E88E5) 3 - reaction i is directionally coupled to reaction j - Red // DarkSalmon // magenta (#D81B60) 4 - reaction j is directionally coupled to reaction i - Green // LightSeaGreen // dark yellow (#FFC107) [NOTE] we now use dark yellow for both 3 and 4 and the magenta as EDGE_SECTED_PAINT in the style json file. """ pallette = {1: 'black', 2: 'blue', 3: 'gree', 4: 'green' } parsed_pairs = set() counter = 0 for source in df.index: if model is not None and exclude_biomass and biomass_reaction is not None: if source == biomass_reaction: continue for target in df.index: if model is not None and exclude_biomass and biomass_reaction is not None: if target == biomass_reaction: continue # Check if pair is already parsed if (source, target) in parsed_pairs or (target, source) in parsed_pairs: continue # Add pair in parsed and parsed_pairs.add((source, target)) value = df.loc[source, target] # Ignore cases where i = j or value is 0 if source != target and value != 0: counter += 1 color = pallette.get(value, 'gray') # In case where j coupled to i, reverse the source - target if value == 4: G.add_edge(target, source, color=color) G.add_edge(source, target, color=color) # Compute the degree of each node node_degrees = dict(G.degree()) # Create a subgraph with nodes that have non-zero degree subgraph_nodes = [node for node, degree in node_degrees.items() if degree > 0] # Remove routes that include exchange reactions if model is not None and remove_exchange_routes: exchange_reactions = [] for r in model.reactions: if 'e' in r.compartments or 'e0' in r.compartments: exchange_reactions.append(r.id) tmp = subgraph_nodes.copy() for node in tmp: if node in exchange_reactions: subgraph_nodes.remove(node) # Build a graph based on the subgraph nodes qfca_graph = G.subgraph(subgraph_nodes) if model: # graph's nodes are reactions, we ll add model name and stoichiometry as attributes for node in qfca_graph.nodes: r = model.reactions.get_by_id(node) reactants = [i.name for i in r.reactants] products = [i.name for i in r.products] qfca_graph.nodes[node]['rxn_name'] = r.name qfca_graph.nodes[node]['rxn_reactants'] = ';'.join(reactants) qfca_graph.nodes[node]['rxn_products'] = ';'.join(products) return qfca_graph
[docs]def samples_on_qfca(qfca_graph, samples): """ ongoing """ from sklearn.metrics import silhouette_score from sklearn.cluster import KMeans from sklearn import metrics graph_rxns = list(qfca_graph.nodes) # df = np.zeros( [len(list(qfca_graph.nodes)), samples.shape[1] ]) samples_with_graph_rnxs = samples.loc[graph_rxns] # Assuming df is your DataFrame containing continuous values # Initialize a list to store inertias inertias = [] # Define the range of k values you want to test k_range = range(1, 11) # You can adjust this range as needed # Calculate inertia for each k for k in k_range: kmeans = KMeans(n_clusters=k) kmeans.fit(samples_with_graph_rnxs) inertias.append(kmeans.inertia_)
# %% Util functions: flux balance analysis
[docs]def producing_or_consuming_a_met(model, reaction_id, metabolite_id): """ Returns whether a metabolite is being produced or consumed when model is optimized. Args: model (cobra.Model) reaction_id (str) metabolite_id (str) Returns: 'producing | consuming' (str) """ r = model.reactions.get_by_id(reaction_id) met = model.metabolites.get_by_id(metabolite_id) flux_value = r.summary().to_frame()["flux"].item() if met not in r.reactants and met not in r.products: return ValueError("the metabolite you are asking is not part of the reaction you gave.") if (flux_value > 0 and met in r.products) or ( flux_value < 0 and met in r.reactants): return "producing" elif (flux_value > 0 and met in r.reactants) or (flux_value < 0 and met in r.products): return "consuming"
[docs]def get_reactions_producing_a_met(model, metabolite_id): """ Get reactions that produce a specific metabolite. Notes: Exloits the producing_or_consuming_a_met() function Args: model (cobra.Model) metabolite_id (str) Returns: list of reactions (cobra.Reaction) """ rxns = [] for reaction in model.metabolites.get_by_id(metabolite_id).reactions: if producing_or_consuming_a_met(model, reaction_id=reaction.id, metabolite_id=metabolite_id) == "producing": rxns.append(reaction) return rxns
[docs]def trace_path_with_backtracking_iterative(model, start_reaction_id, target_reaction_id, ignore_mets=None): """ Trace a path from the start reaction to the target reaction through reactants, using an iterative DFS approach, with backtracking when exchange reactions are encountered. Args: model (cobra.Model): COBRApy model object. start_reaction_id (str): The ID of the starting reaction. target_reaction_id (str): The ID of the target reaction. ignore_mets: A list of metabolite IDs to ignore during tracing (optional). Returns: keep_rxns: A set of reactions needed to go from start to target, avoiding dead-end exchange reactions. """ def is_exchange_reaction(reaction): """Check if a reaction is an exchange reaction.""" return len(reaction.reactants) == 0 or len(reaction.products) == 0 or reaction.id.startswith("EX_") # Initialize structures for DFS visited_reactions = set() # Tracks all reactions visited to prevent revisiting stack = [(model.reactions.get_by_id(start_reaction_id), [])] valid_paths = [] # List to store valid paths that do not dead-end at exchange reactions dead_end_paths = set() # Tracks reactions leading to dead-end exchange reactions keep_rxns = set() # Final set of reactions in valid paths while stack: current_reaction, path = stack.pop() # If we reached the target, store the path and continue exploring for other valid paths if current_reaction.id == target_reaction_id: valid_paths.append(path + [current_reaction.id]) continue # Skip if the current reaction is an exchange reaction if is_exchange_reaction(current_reaction): dead_end_paths.add(current_reaction.id) continue visited_reactions.add(current_reaction.id) path.append(current_reaction.id) # Track the path # Explore the reactants of the current reaction if current_reaction.summary().to_frame()["flux"].item() > 0: reactants = current_reaction.reactants else: reactants = current_reaction.products found_valid_branch = False # Track if any valid branches are found from the current reaction for metabolite in reactants: if metabolite.id in ignore_mets: continue # Find reactions that produce this metabolite rxns = get_reactions_producing_a_met(model, metabolite_id=metabolite.id) for reaction in rxns: flux_value = reaction.summary().to_frame()["flux"].item() if flux_value == 0: continue if reaction.id not in visited_reactions and reaction.id not in dead_end_paths: # If we find a valid branch, continue exploring it stack.append((reaction, path.copy())) # Add new reaction to stack with the updated path found_valid_branch = True keep_rxns.add(reaction.id) # If no valid branch was found, mark this reaction as a dead-end (exchange reaction or no valid paths) if not found_valid_branch: dead_end_paths.add(current_reaction.id) # If no valid path found, return an empty set if not valid_paths: return set() # Return the reactions that are part of any valid path keep_rxns.add(start_reaction_id) return keep_rxns