Source code for dolfinx_mpc.utils.mpc_utils

# Copyright (C) 2020-2021 Jørgen S. Dokken
#
# This file is part of DOLFINX_MPC
#
# SPDX-License-Identifier:    MIT
from __future__ import annotations

from mpi4py import MPI
from petsc4py import PETSc

import dolfinx.common as _common
import dolfinx.cpp as _cpp
import dolfinx.fem as _fem
import dolfinx.geometry as _geometry
import dolfinx.la as _la
import dolfinx.log as _log
import dolfinx.mesh as _mesh
import numpy as np
import ufl
from dolfinx import default_scalar_type as _dt

import dolfinx_mpc.cpp

__all__ = [
    "rotation_matrix",
    "facet_normal_approximation",
    "log_info",
    "rigid_motions_nullspace",
    "determine_closest_block",
    "create_normal_approximation",
    "create_point_to_point_constraint",
]


def rotation_matrix(axis, angle):
    # See https://en.wikipedia.org/wiki/Rotation_matrix,
    # Subsection: Rotation_matrix_from_axis_and_angle.
    if np.isclose(np.inner(axis, axis), 1):
        n_axis = axis
    else:
        # Normalize axis
        n_axis = axis / np.sqrt(np.inner(axis, axis))

    # Define cross product matrix of axis
    axis_x = np.array([[0, -n_axis[2], n_axis[1]], [n_axis[2], 0, -n_axis[0]], [-n_axis[1], n_axis[0], 0]])
    identity = np.cos(angle) * np.eye(3)
    outer = (1 - np.cos(angle)) * np.outer(n_axis, n_axis)
    return np.sin(angle) * axis_x + identity + outer


[docs] def facet_normal_approximation( V, mt: _mesh.MeshTags, mt_id: int, tangent=False, jit_options: dict = {}, form_compiler_options: dict = {}, ): """ Approximate the facet normal by projecting it into the function space for a set of facets Args: V: The function space to project into mt: The `dolfinx.mesh.MeshTagsMetaClass` containing facet markers mt_id: The id for the facets in `mt` we want to represent the normal at tangent: To approximate the tangent to the facet set this flag to `True` jit_options: Parameters used in CFFI JIT compilation of C code generated by FFCx. See https://github.com/FEniCS/dolfinx/blob/main/python/dolfinx/jit.py#L22-L37 for all available parameters. Takes priority over all other parameter values. form_compiler_options: Parameters used in FFCx compilation of this form. Run `ffcx - -help` at the commandline to see all available options. Takes priority over all other parameter values, except for `scalar_type` which is determined by DOLFINx. """ timer = _common.Timer("~MPC: Facet normal projection") comm = V.mesh.comm n = ufl.FacetNormal(V.mesh) nh = _fem.Function(V) u, v = ufl.TrialFunction(V), ufl.TestFunction(V) ds = ufl.ds(domain=V.mesh, subdomain_data=mt, subdomain_id=mt_id) if tangent: if V.mesh.geometry.dim == 1: raise ValueError("Tangent not defined for 1D problem") elif V.mesh.geometry.dim == 2: a = ufl.inner(u, v) * ds L = ufl.inner(ufl.as_vector([-n[1], n[0]]), v) * ds else: def tangential_proj(u, n): """ See for instance: https://link.springer.com/content/pdf/10.1023/A:1022235512626.pdf """ return (ufl.Identity(u.ufl_shape[0]) - ufl.outer(n, n)) * u c = _fem.Constant(V.mesh, [1, 1, 1]) a = ufl.inner(u, v) * ds L = ufl.inner(tangential_proj(c, n), v) * ds else: a = ufl.inner(u, v) * ds L = ufl.inner(n, v) * ds # Find all dofs that are not boundary dofs imap = V.dofmap.index_map all_blocks = np.arange(imap.size_local, dtype=np.int32) top_blocks = _fem.locate_dofs_topological(V, V.mesh.topology.dim - 1, mt.find(mt_id)) deac_blocks = all_blocks[np.isin(all_blocks, top_blocks, invert=True)] # Note there should be a better way to do this # Create sparsity pattern only for constraint + bc bilinear_form = _fem.form(a, jit_options=jit_options, form_compiler_options=form_compiler_options) pattern = _fem.create_sparsity_pattern(bilinear_form) pattern.insert_diagonal(deac_blocks) pattern.finalize() u_0 = _fem.Function(V) u_0.x.petsc_vec.set(0) bc_deac = _fem.dirichletbc(u_0, deac_blocks) A = _cpp.la.petsc.create_matrix(comm, pattern) A.zeroEntries() # Assemble the matrix with all entries form_coeffs = _cpp.fem.pack_coefficients(bilinear_form._cpp_object) form_consts = _cpp.fem.pack_constants(bilinear_form._cpp_object) _cpp.fem.petsc.assemble_matrix(A, bilinear_form._cpp_object, form_consts, form_coeffs, [bc_deac._cpp_object]) if bilinear_form.function_spaces[0] is bilinear_form.function_spaces[1]: A.assemblyBegin(PETSc.Mat.AssemblyType.FLUSH) # type: ignore A.assemblyEnd(PETSc.Mat.AssemblyType.FLUSH) # type: ignore _cpp.fem.petsc.insert_diagonal(A, bilinear_form.function_spaces[0], [bc_deac._cpp_object], 1.0) A.assemble() linear_form = _fem.form(L, jit_options=jit_options, form_compiler_options=form_compiler_options) b = _fem.petsc.assemble_vector(linear_form) _fem.petsc.apply_lifting(b, [bilinear_form], [[bc_deac]]) b.ghostUpdate(addv=PETSc.InsertMode.ADD_VALUES, mode=PETSc.ScatterMode.REVERSE) # type: ignore _fem.petsc.set_bc(b, [bc_deac]) # Solve Linear problem solver = PETSc.KSP().create(V.mesh.comm) # type: ignore solver.setType("cg") solver.rtol = 1e-8 solver.setOperators(A) solver.solve(b, nh.x.petsc_vec) nh.x.petsc_vec.ghostUpdate(addv=PETSc.InsertMode.INSERT, mode=PETSc.ScatterMode.FORWARD) # type: ignore timer.stop() solver.destroy() b.destroy() return nh
[docs] def log_info(message): """ Wrapper for logging a simple string on the zeroth communicator Reverting the log level """ old_level = _log.get_log_level() if MPI.COMM_WORLD.rank == 0: _log.set_log_level(_log.LogLevel.INFO) _log.log(_log.LogLevel.INFO, message) _log.set_log_level(old_level)
[docs] def rigid_motions_nullspace(V: _fem.FunctionSpace): """ Function to build nullspace for 2D/3D elasticity. Args: V: The function space """ _x = _fem.Function(V) # Get geometric dim gdim = V.mesh.geometry.dim assert gdim == 2 or gdim == 3 # Set dimension of nullspace dim = 3 if gdim == 2 else 6 # Create list of vectors for null space nullspace_basis = [ _la.vector(V.dofmap.index_map, bs=V.dofmap.index_map_bs, dtype=PETSc.ScalarType) # type: ignore for i in range(dim) ] basis = [b.array for b in nullspace_basis] dofs = [V.sub(i).dofmap.list.reshape(-1) for i in range(gdim)] # Build translational null space basis for i in range(gdim): basis[i][dofs[i]] = 1.0 # Build rotational null space basis x = V.tabulate_dof_coordinates() dofs_block = V.dofmap.list.reshape(-1) x0, x1, x2 = x[dofs_block, 0], x[dofs_block, 1], x[dofs_block, 2] if gdim == 2: basis[2][dofs[0]] = -x1 basis[2][dofs[1]] = x0 elif gdim == 3: basis[3][dofs[0]] = -x1 basis[3][dofs[1]] = x0 basis[4][dofs[0]] = x2 basis[4][dofs[2]] = -x0 basis[5][dofs[2]] = x1 basis[5][dofs[1]] = -x2 for b in nullspace_basis: b.scatter_forward() _la.orthonormalize(nullspace_basis) assert _la.is_orthonormal(nullspace_basis, float(np.finfo(_x.x.array.dtype).eps)) local_size = V.dofmap.index_map.size_local * V.dofmap.index_map_bs basis_petsc = [ PETSc.Vec().createWithArray(x[:local_size], bsize=gdim, comm=V.mesh.comm) # type: ignore for x in basis ] return PETSc.NullSpace().create(comm=V.mesh.comm, vectors=basis_petsc) # type: ignore
[docs] def determine_closest_block(V, point): """ Determine the closest dofs (in a single block) to a point and the distance """ # Create boundingboxtree of cells connected to boundary facets tdim = V.mesh.topology.dim boundary_facets = _mesh.exterior_facet_indices(V.mesh.topology) V.mesh.topology.create_connectivity(tdim - 1, tdim) f_to_c = V.mesh.topology.connectivity(tdim - 1, tdim) boundary_cells = [] for facet in boundary_facets: boundary_cells.extend(f_to_c.links(facet)) cell_imap = V.mesh.topology.index_map(tdim) boundary_cells = np.array(np.unique(boundary_cells), dtype=np.int32) boundary_cells = boundary_cells[boundary_cells < cell_imap.size_local] bb_tree = _geometry.bb_tree(V.mesh, tdim, entities=boundary_cells, padding=0.0) midpoint_tree = _geometry.create_midpoint_tree(V.mesh, tdim, boundary_cells) # Find facet closest point = np.reshape(point, (1, 3)).astype(V.mesh.geometry.x.dtype) closest_cell = _geometry.compute_closest_entity(bb_tree, midpoint_tree, V.mesh, point)[0] # Set distance high if cell is not owned if cell_imap.size_local < closest_cell or closest_cell == -1: R = 1e5 else: # Get cell geometry p = V.mesh.geometry.x V.mesh.topology.create_connectivity(tdim, tdim) entities = _mesh.entities_to_geometry(V.mesh, tdim, np.array([closest_cell], dtype=np.int32), False) R = np.linalg.norm(_geometry.compute_distance_gjk(point, p[entities[0]])) # Find processor with cell closest to point global_distances = MPI.COMM_WORLD.allgather(R) owning_processor = np.argmin(global_distances) dofmap = V.dofmap imap = dofmap.index_map ghost_owner = imap.owners local_max = imap.size_local # Determine which block of dofs is closest min_distance = max(R, 1e5) minimal_distance_block = None min_dof_owner = owning_processor if MPI.COMM_WORLD.rank == owning_processor: x = V.tabulate_dof_coordinates() cell_blocks = dofmap.cell_dofs(closest_cell) for block in cell_blocks: distance = np.linalg.norm(_geometry.compute_distance_gjk(point, x[block])) if distance < min_distance: # If cell owned by processor, but not the closest dof if block < local_max: min_dof_owner = MPI.COMM_WORLD.rank else: min_dof_owner = ghost_owner[block - local_max] minimal_distance_block = block min_distance = distance min_dof_owner = MPI.COMM_WORLD.bcast(min_dof_owner, root=owning_processor) # If dofs not owned by cell if owning_processor != min_dof_owner: owning_processor = min_dof_owner if MPI.COMM_WORLD.rank == min_dof_owner: # Re-search using the closest cell x = V.tabulate_dof_coordinates() cell_blocks = dofmap.cell_dofs(closest_cell) for block in cell_blocks: distance = np.linalg.norm(_geometry.compute_distance_gjk(point, x[block])) if distance < min_distance: # If cell owned by processor, but not the closest dof if block < local_max: min_dof_owner = MPI.COMM_WORLD.rank else: min_dof_owner = ghost_owner[block - local_max] minimal_distance_block = block min_distance = distance assert min_dof_owner == owning_processor return owning_processor, [minimal_distance_block] else: return owning_processor, []
def create_point_to_point_constraint(V, slave_point, master_point, vector=None): # Determine which processor owns the dof closest to the slave and master point slave_proc, slave_block = determine_closest_block(V, slave_point) master_proc, master_block = determine_closest_block(V, master_point) is_master_proc = MPI.COMM_WORLD.rank == master_proc is_slave_proc = MPI.COMM_WORLD.rank == slave_proc block_size = V.dofmap.index_map_bs imap = V.dofmap.index_map # Output structures slaves, masters, coeffs, owners, offsets = [], [], [], [], [] # Information required to handle vector as input zero_indices, slave_index = None, None if vector is not None: zero_indices = np.argwhere(np.isclose(vector, 0)).T[0] slave_index = np.argmax(np.abs(vector)) if is_slave_proc: assert len(slave_block) == 1 slave_block_g = imap.local_to_global(np.asarray(slave_block, dtype=np.int32))[0] if vector is None: slaves = np.arange( slave_block[0] * block_size, slave_block[0] * block_size + block_size, dtype=np.int32, ) else: assert len(vector) == block_size # Check for input vector (Should be of same length as number of slaves) # All entries should not be zero assert not np.isin(slave_index, zero_indices) # Check vector for zero contributions slaves = np.array([slave_block[0] * block_size + slave_index], dtype=np.int32) for i in range(block_size): if i != slave_index and not np.isin(i, zero_indices): masters.append(slave_block_g * block_size + i) owners.append(slave_proc) coeffs.append(-vector[i] / vector[slave_index]) global_masters = None if is_master_proc: assert len(master_block) == 1 master_block_g = imap.local_to_global(np.asarray(master_block, dtype=np.int32))[0] masters_as_glob = np.arange( master_block_g * block_size, master_block_g * block_size + block_size, dtype=np.int64 ) else: masters_as_glob = np.array([], dtype=np.int64) ghost_processors = [] shared_indices = dolfinx_mpc.cpp.mpc.compute_shared_indices(V._cpp_object) if is_master_proc and is_slave_proc: # If slaves and masters are on the same processor finalize local work if vector is None: masters = masters_as_glob owners = np.full(len(masters), master_proc, dtype=np.int32) coeffs = np.ones(len(masters), dtype=_dt) offsets = np.arange(0, len(masters) + 1, dtype=np.int32) else: for i in range(len(masters_as_glob)): if not np.isin(i, zero_indices): masters.append(masters_as_glob[i]) owners.append(master_proc) coeffs.append(vector[i] / vector[slave_index]) offsets = [0, len(masters)] else: # Send/Recv masters from other processor if is_master_proc: MPI.COMM_WORLD.send(masters_as_glob, dest=slave_proc, tag=10) if is_slave_proc: global_masters = MPI.COMM_WORLD.recv(source=master_proc, tag=10) for i, master in enumerate(global_masters): if not np.isin(i, zero_indices): masters.append(master) owners.append(master_proc) if vector is None: coeffs.append(1) else: coeffs.append(vector[i] / vector[slave_index]) if vector is None: offsets = np.arange(0, len(slaves) + 1, dtype=np.int32) else: offsets = np.array([0, len(masters)], dtype=np.int32) ghost_processors = shared_indices.links(slave_block[0]) # Broadcast processors containg slave ghost_processors = MPI.COMM_WORLD.bcast(ghost_processors, root=slave_proc) if is_slave_proc: for proc in ghost_processors: MPI.COMM_WORLD.send(slave_block_g * block_size + slaves % block_size, dest=proc, tag=20 + proc) MPI.COMM_WORLD.send(coeffs, dest=proc, tag=30 + proc) MPI.COMM_WORLD.send(owners, dest=proc, tag=40 + proc) MPI.COMM_WORLD.send(masters, dest=proc, tag=50 + proc) MPI.COMM_WORLD.send(offsets, dest=proc, tag=60 + proc) # Receive data for ghost slaves ghost_slaves, ghost_masters, ghost_coeffs, ghost_owners, ghost_offsets = [], [], [], [], [] if np.isin(MPI.COMM_WORLD.rank, ghost_processors): # Convert recieved slaves to the corresponding ghost index recv_slaves = MPI.COMM_WORLD.recv(source=slave_proc, tag=20 + MPI.COMM_WORLD.rank) ghost_coeffs = MPI.COMM_WORLD.recv(source=slave_proc, tag=30 + MPI.COMM_WORLD.rank) ghost_owners = MPI.COMM_WORLD.recv(source=slave_proc, tag=40 + MPI.COMM_WORLD.rank) ghost_masters = MPI.COMM_WORLD.recv(source=slave_proc, tag=50 + MPI.COMM_WORLD.rank) ghost_offsets = MPI.COMM_WORLD.recv(source=slave_proc, tag=60 + MPI.COMM_WORLD.rank) # Unroll ghost blocks ghosts = imap.ghosts ghost_dofs = [g * block_size + i for g in ghosts for i in range(block_size)] ghost_slaves = np.zeros(len(recv_slaves), dtype=np.int32) local_size = imap.size_local for i, slave in enumerate(recv_slaves): idx = np.argwhere(ghost_dofs == slave)[0, 0] ghost_slaves[i] = local_size * block_size + idx slaves = np.asarray(np.append(slaves, ghost_slaves), dtype=np.int32) masters = np.asarray(np.append(masters, ghost_masters), dtype=np.int64) coeffs = np.asarray(np.append(coeffs, ghost_coeffs), dtype=_dt) owners = np.asarray(np.append(owners, ghost_owners), dtype=np.int32) offsets = np.asarray(np.append(offsets, ghost_offsets), dtype=np.int32) return slaves, masters, coeffs, owners, offsets
[docs] def create_normal_approximation(V: _fem.FunctionSpace, mt: _cpp.mesh.MeshTags_int32, value: int): """ Creates a normal approximation for the dofs in the closure of the attached entities. Where a dof is attached to multiple entities, an average is computed. Args: V: The function space mt: The meshtag containing the indices value: Value for the entities in the mesh tag to compute normal on Returns: nh: The normal vector """ nh = _fem.Function(V) n_cpp = dolfinx_mpc.cpp.mpc.create_normal_approximation(V._cpp_object, mt.dim, mt.find(value)) nh._cpp_object = n_cpp return nh