Source code for fame.FVM.visualization

import os
import vtk
import numpy as np
from .mesh import StructuredMesh, StructuredMesh1D

[docs] class MeshWriter: def __new__(cls, mesh): if isinstance(mesh, StructuredMesh1D): return super().__new__(MeshWriter1D) return super().__new__(MeshWriter3D) def __init__(self, mesh): """ Initialize with a StructuredMesh instance. Args: mesh (StructuredMesh): The structured mesh object containing the grid and scalar data. """ if not isinstance(mesh, StructuredMesh): raise TypeError("The provided mesh must be an instance of StructuredMesh.") self.mesh = mesh
[docs] class MeshWriter3D(MeshWriter): """ Definition of 3D mesh writer. """ def _writeSingleVTS(self, output_file, variables): """ Writes variables (scalar, vector, tensor) of the StructuredMesh to a .vts file. Args: output_file (str): Path to the output VTS file. variables (dict): Dictionary where keys are variable names and values are numpy arrays. Each array should have the correct shape for the mesh dimensions. """ if not output_file.endswith('.vts'): output_file += '.vts' if not hasattr(self.mesh, 'dimensions') or not hasattr(self.mesh, 'GetPoints'): raise ValueError("The provided mesh must have 'dimensions' and 'GetPoints' attributes.") points = self.mesh.GetPoints() num_points = points.GetNumberOfPoints() num_cells = self.mesh.GetNumberOfCells() if not points or num_points == 0: raise ValueError("The provided mesh must have valid vtkPoints.") for var_name, var_data in variables.items(): var_array = vtk.vtkDoubleArray() var_array.SetName(var_name) if var_data.shape[0] == num_points: # Write to point data target = self.mesh.GetPointData() elif var_data.shape[0] == num_cells: # Write to cell data target = self.mesh.GetCellData() else: raise ValueError( f"Mismatch between '{var_name}' size and grid dimensions. " f"Expected {num_points} for points or {num_cells} for cells." ) # Set number of components for scalar, vector, or tensor data if var_data.ndim == 1: var_array.SetNumberOfComponents(1) # Explicit for scalar for value in var_data: var_array.InsertNextValue(value) else: var_array.SetNumberOfComponents(var_data.shape[1]) # Vector or tensor for value in var_data: var_array.InsertNextTuple(value) target.AddArray(var_array) writer = vtk.vtkXMLStructuredGridWriter() writer.SetFileName(output_file) writer.SetInputData(self.mesh) writer.Write() print(f"Structured mesh with variables written to {output_file}")
[docs] def writeVTS(self, output_dir, variables, time=None, step=None): """ Writes a single timestep data to a .vts file and updates the PVD file. For steady-state, it writes a single time step. Args: output_dir (str): Directory to save the output .vts file and PVD file. variables (dict): Dictionary of variables for the timestep. time (float, optional): Time value for the current timestep. Defaults to 0.0 for steady-state. step (int, optional): Step index for naming the .vts file. Defaults to 0 for steady-state. """ os.makedirs(output_dir, exist_ok=True) pvd_file = os.path.join(output_dir, os.path.basename(output_dir) + '.pvd') if not os.path.exists(pvd_file): # Initialize a new PVD file if it doesn't exist with open(pvd_file, 'w', newline='') as f: f.write('<VTKFile type="Collection" version="0.1">\n') f.write(' <Collection>\n') f.write(' </Collection>\n') f.write('</VTKFile>\n') time = 0.0 if time is None else time step = 0 if step is None else step vts_file = os.path.join(output_dir, f"output_{step:04d}.vts") self._writeSingleVTS(vts_file, variables) # Update the PVD file with open(pvd_file, 'r+', newline='') as f: lines = f.readlines() insert_index = len(lines) - 2 lines.insert(insert_index, f' <DataSet timestep="{time}" group="" part="0" file="{os.path.basename(vts_file)}"/>\n') f.seek(0) f.writelines(lines) print(f"Updated PVD file: {pvd_file} with timestep {time} and file {vts_file}")
[docs] class MeshWriter1D(MeshWriter): def _writeSingleVTP(self, output_file, variables): if not output_file.endswith('.vtp'): output_file += '.vtp' points = self.mesh.GetPoints() num_points = points.GetNumberOfPoints() if not points or num_points == 0: raise ValueError("The provided mesh must have valid vtkPoints.") for var_name, var_data in variables.items(): var_array = vtk.vtkDoubleArray() var_array.SetName(var_name) # Determine if data is for points or cells if var_data.shape[0] == num_points: target = self.mesh.GetPointData() elif var_data.shape[0] == self.mesh.GetNumberOfCells(): target = self.mesh.GetCellData() else: raise ValueError( f"Mismatch between '{var_name}' size and mesh. " f"Expected {num_points} for points or {self.mesh.GetNumberOfCells()} for cells." ) # Set components based on data shape (scalar, vector, tensor) if var_data.ndim == 1: var_array.SetNumberOfComponents(1) for value in var_data: var_array.InsertNextValue(value) else: var_array.SetNumberOfComponents(var_data.shape[1]) for value in var_data: var_array.InsertNextTuple(value) target.AddArray(var_array) # Write the PolyData mesh to .vtp writer = vtk.vtkXMLPolyDataWriter() writer.SetFileName(output_file) writer.SetInputData(self.mesh) writer.Write() print(f"PolyData mesh written to {output_file}")
[docs] def writeVTS(self, output_dir, variables, time=None, step=None): os.makedirs(output_dir, exist_ok=True) pvd_file = os.path.join(output_dir, os.path.basename(output_dir) + '.pvd') # Create new PVD file if it doesn't exist if not os.path.exists(pvd_file): with open(pvd_file, 'w') as f: f.write('<VTKFile type="Collection" version="0.1">\n') f.write(' <Collection>\n') f.write(' </Collection>\n') f.write('</VTKFile>\n') time = 0.0 if time is None else time step = 0 if step is None else step vtp_file = os.path.join(output_dir, f"output_{step:04d}.vtp") self._writeSingleVTP(vtp_file, variables) # Update the PVD file with the new VTP entry with open(pvd_file, 'r+') as f: lines = f.readlines() insert_index = len(lines) - 2 lines.insert(insert_index, f' <DataSet timestep="{time}" file="{os.path.basename(vtp_file)}"/>\n') f.seek(0) f.writelines(lines) print(f"Updated PVD file: {pvd_file}")