Skip to content

API: Core

matmdl.core

crystalPlasticity

This module contains functions relevant to the application of Huang's crystal plasticity subroutine.

do_orientation_inputs(next_params, orient, in_opt)

Get and write new orientation information, if necessary.

If input file is in orientation structure, uses that.

Source code in matmdl/core/crystalPlasticity.py
def do_orientation_inputs(next_params, orient, in_opt):
	"""
	Get and write new orientation information, if necessary.

	If input file is in orientation structure, uses that.
	"""
	if not in_opt.has_orient_opt[orient] and "inp" not in uset.orientations[orient]:
		return

	if in_opt.has_orient_opt[orient]:
		orient_components = get_orient_info(next_params, orient, in_opt)
		writer.write_input_params(
			"mat_orient.inp", orient_components["names"], orient_components["values"]
		)
	else:
		if "inp" in uset.orientations[orient]:
			if len(uset.orientations[orient]["inp"]) > 1:
				# if two filenames given, copy one to the other
				shutil.copy(
					uset.orientations[orient]["inp"][0], 
					uset.orientations[orient]["inp"][1]
				)
			else:
				# otherwise, copy one filename to the standard orientation file name
				shutil.copy(f"mat_orient_{orient}.inp", "mat_orient.inp")

get_offset_angle(direction_og, direction_to, angle)

Iterative solution for finding vectors tilted toward other vectors.

Parameters:

Name Type Description Default
direction_og vector

Real space vector defining the original direction to be tilted away from.

required
direction_to vector

Real space vector defining the direction to be tilted towards.

required
angle float

The angle, in degrees, by which to tilt.

required

Returns:

Name Type Description
float float

a scalar multiplier such that the angle between direction_og and sol.x * direction_to is angle.

Source code in matmdl/core/crystalPlasticity.py
def get_offset_angle(direction_og: "vector", direction_to: "vector", angle: float) -> float:
	"""
	Iterative solution for finding vectors tilted toward other vectors.

	Args:
	    direction_og: Real space vector defining
	        the original direction to be tilted away from.
	    direction_to: Real space vector defining
	        the direction to be tilted towards.
	    angle: The angle, in degrees, by which to tilt.

	Returns:
	    float:
	        a scalar multiplier such that the angle between ``direction_og``
	        and ``sol.x`` * ``direction_to`` is ``angle``.

	"""

	def _opt_angle(offset_amt: float, direction_og: "vector", direction_to: "vector", angle: float):
		"""
		Angle difference between original vector and new vector, which is
		made by small offset toward new direction.  Returns zero when offset_amt
		produces new vector at desired angle.  Uses higher namespace variables so
		that the single argument can be tweaked by optimizer.
		"""
		direction_new = direction_og + offset_amt * direction_to
		angle_difference = np.dot(direction_og, direction_new) / (
			norm(direction_og) * norm(direction_new)
		) - np.cos(np.deg2rad(angle))
		return angle_difference

	sol = root(_opt_angle, 0.01, args=(direction_og, direction_to, angle), tol=1e-10).x[0]
	return sol

get_orient_info(next_params, orient, in_opt)

Get components of orientation-defining vectors and their names for substitution into the orientation input files.

Parameters:

Name Type Description Default
next_params list

Next set of parameters to be evaluated by the optimization scheme.

required
orient str

Index string for dictionary of input orientations specified in :ref:orientations.

required
Source code in matmdl/core/crystalPlasticity.py
def get_orient_info(
	next_params: list,
	orient: str,
	in_opt: object,
) -> dict:
	"""
	Get components of orientation-defining vectors and their names
	for substitution into the orientation input files.

	Args:
	    next_params: Next set of parameters to be evaluated
	        by the optimization scheme.
	    orient: Index string for dictionary of input
	        orientations specified in :ref:`orientations`.
	"""
	dir_load = np.asarray(uset.orientations[orient]["offset"]["dir_load"])
	dir_0deg = np.asarray(uset.orientations[orient]["offset"]["dir_0deg"])

	if orient + "_mag" in in_opt.params:
		index_mag = in_opt.params.index(orient + "_mag")
		angle_mag = next_params[index_mag]
	else:
		angle_mag = in_opt.fixed_vars[orient + "_mag"]

	if orient + "_deg" in in_opt.params:
		index_deg = in_opt.params.index(orient + "_deg")
		angle_deg = next_params[index_deg]
	else:
		angle_deg = in_opt.fixed_vars[orient + "_deg"]

	col_load = unit_vector(np.asarray(dir_load))
	col_0deg = unit_vector(np.asarray(dir_0deg))
	col_cross = unit_vector(np.cross(col_load, col_0deg))

	basis_og = np.stack((col_load, col_0deg, col_cross), axis=1)
	rotation = _mk_x_rot(np.deg2rad(angle_deg))
	basis_new = np.matmul(basis_og, rotation)
	dir_to = basis_new[:, 1]

	if __debug__:  # write angle_deg rotation info
		dir_load = dir_load / norm(dir_load)
		dir_to = dir_to / norm(dir_to)
		dir_0deg = dir_0deg / norm(dir_0deg)
		with open("out_debug.txt", "a+") as f:
			f.write(f"orientation: {orient}")
			f.write(f"\nbasis OG: \n{basis_og}")
			f.write("\n")
			f.write(f"\nrotation: \n{rotation}")
			f.write("\n")
			f.write(f"\nbasis new: \n{basis_new}")
			f.write("\n\n")
			f.write(f"dir_load: {dir_load}\tdir_to: {dir_to}\n")
			f.write(f"angle_deg_inp: {angle_deg}\n")
			f.write(f"all params: {next_params}")

	sol = get_offset_angle(dir_load, dir_to, angle_mag)
	dir_tot = dir_load + sol * dir_to
	dir_ortho = np.array([1, 0, -dir_tot[0] / dir_tot[2]])

	if __debug__:  # write final loading orientation info
		angle_output = (
			np.arccos(np.dot(dir_tot, dir_load) / (norm(dir_tot) * norm(dir_load))) * 180.0 / np.pi
		)
		with open("out_debug.txt", "a+") as f:
			f.write(f"\ndir_tot: {dir_tot}")
			f.write(f"\ndir_ortho: {dir_ortho}")
			f.write(f"\nangle_mag_input: {angle_mag}\tangle_mag_output: {angle_output}")
			f.write("\n\n")

	component_names = ["x1", "y1", "z1", "u1", "v1", "w1"]
	component_values = list(dir_ortho) + list(dir_tot)

	return {"names": component_names, "values": component_values}

param_check(param_list)

True if tau0 >= tauS

In theory, tau0 should always come before tauS, even though it doesn't make a difference mathematically/practically. Function checks for multiple systems if numbered in the form TauS, TauS1, TauS2 and Tau0, Tau01, Tau02.

Note

Deprecated. Better to do this by mapping whatever hyper-rectangular input bounds to your acceptable parameter space. E.g. optimizing on tauS_shift on [0,10] and adding a derived parameter in the Abaqus inputs: tauS = tau0 + tauS_shift.

Source code in matmdl/core/crystalPlasticity.py
def param_check(param_list: list[str]):
	"""
	True if tau0 >= tauS

	In theory, tau0 should always come before tauS, even though it doesn't make a difference
	mathematically/practically. Function checks for multiple systems if numbered in the form
	``TauS``, ``TauS1``, ``TauS2`` and ``Tau0``, ``Tau01``, ``Tau02``.

	Note:
	    Deprecated. Better to do this by mapping whatever hyper-rectangular input bounds
	    to your acceptable parameter space. E.g. optimizing on `tauS_shift` on [0,10]
	    and adding a derived parameter in the Abaqus inputs: `tauS = tau0 + tauS_shift`.
	"""
	# TODO: ck if it's possible to satisfy this based on mat_params and bounds, raise helpful error
	tau0_list, tauS_list = [], []
	for sysnum in ["", "1", "2"]:
		if ("TauS" + sysnum in param_list) or ("Tau0" + sysnum in param_list):
			f1 = open(uset.param_file, "r")
			lines = f1.readlines()
			for line in lines:
				if line.startswith("Tau0" + sysnum):
					tau0_list.append(float(line[7:]))
				if line.startswith("TauS" + sysnum):
					tauS_list.append(float(line[7:]))
			f1.close()
	is_bad = any([(tau0 >= tauS) for tau0, tauS in zip(tau0_list, tauS_list)])
	return is_bad

experimental

Contains the class for extracting and storing experimental data from plain text inputs for comparison to iterative solution attempts.

ExpData

Loads and stores experimental data.

Attributes:

Name Type Description
data dict

Indexed by orientation name defined in :ref:orientations, with values of max strain (internal: _max_strain) and raw, which houses the experimental stress strain data truncated by max strain.

Note

Experimental stress-strain data are expected as plaintext in two columns: strain (unitless), and stress (matching the CPFEM inputs, often MPa).

Source code in matmdl/core/experimental.py
class ExpData:
	"""
	Loads and stores experimental data.

	Attributes:
	    data (dict): Indexed by orientation name defined in :ref:`orientations`,
	        with values of max strain (internal: ``_max_strain``) and ``raw``,
	        which houses the experimental stress strain data truncated by max strain.

	Note:
	    Experimental stress-strain data are expected as plaintext in two columns:
	    strain (unitless), and stress (matching the CPFEM inputs, often MPa).

	"""

	def __init__(self, orientations: dict):
		self.data = {}
		for orient in orientations.keys():
			expname = orientations[orient]["exp"]
			min_strain, max_strain = self._get_bounds(expname, orient)
			raw = self._get_SS(expname, min_strain, max_strain)
			self.data[orient] = {
				"max_strain": max_strain,
				"min_strain": min_strain,
				"raw": raw,
			}

	def tell_max_strain(self, orient: str):
		"""Convience method to forget how data is laid out"""
		return self.data[orient]["max_strain"]

	def _load(self, fname: str):
		"""
		Load original experimental stress-strain data and order it by strain.

		Args:
		    fname: Filename for experimental stress-strain data
		"""
		original_SS = np.loadtxt(fname, skiprows=1, delimiter=",")
		order = -1 if uset.is_compression else 1
		original_SS = original_SS[original_SS[:, 0].argsort()][::order]
		return original_SS

	def _get_bounds(self, fname: str, orient: str):
		"""get limiting bounds"""
		mins = []
		maxes = []

		# orientation limits:
		if "min_strain" in uset.orientations[orient].keys():
			mins.append(float(uset.orientations[orient]["min_strain"]))
		if "max_strain" in uset.orientations[orient].keys():
			orient_max_strain = float(uset.orientations[orient]["max_strain"])
			if orient_max_strain != 0.0:
				maxes.append(orient_max_strain)

		# global limits
		if hasattr(uset, "min_strain"):
			mins.append(uset.min_strain)
		if hasattr(uset, "max_strain"):
			if float(uset.max_strain) != 0.0:
				maxes.append(uset.max_strain)

		# data limits
		data = np.sort(np.loadtxt(fname, skiprows=1, delimiter=",")[:, 0])
		mins.append(data[0])
		maxes.append(data[-1])

		# get limiting bounds to use
		if uset.is_compression:  # negative numbers
			min_use = min(mins)
			max_use = max(maxes)
		else:
			min_use = max(mins)
			max_use = min(maxes)

		if False:
			print("dbg bounds: mins:", mins)
			print("dbg bounds: maxes:", maxes)
			print("dbg bounds: min:", min_use)
			print("dbg bounds: max:", max_use)

		return min_use, max_use

	def _get_max_strain(self, fname: str, orient: str):
		"""
		Take either user max strain or file max strain.

		Args:
		    fname: Filename for experimental stress-strain data
		"""
		if float(uset.max_strain) == 0.0:
			if uset.is_compression is True:
				max_strain = min(np.loadtxt(fname, skiprows=1, delimiter=",")[:, 0])
			else:
				max_strain = max(np.loadtxt(fname, skiprows=1, delimiter=",")[:, 0])
		else:
			max_strain = uset.max_strain if not uset.is_compression else (-1 * uset.max_strain)
		return max_strain

	def _get_min_strain(self, fname: str):
		"""
		Take either user min strain or minimum of experimental strain in file `fname`

		Args:
		    fname: Filename for experimental stress-strain data
		"""
		if float(uset.min_strain) == 0.0:
			if uset.is_compression is True:
				min_strain = max(np.loadtxt(fname, skiprows=1, delimiter=",")[:, 0])
			else:
				min_strain = min(np.loadtxt(fname, skiprows=1, delimiter=",")[:, 0])
		else:
			min_strain = uset.min_strain if not uset.is_compression else (-1 * uset.min_strain)
		return min_strain

	def _get_SS(self, fname: str, _min_strain: float, _max_strain: float):
		"""
		Limit experimental data to within min_strain to max_strain.

		Args:
		    fname: Filename for experimental stress-strain data
		"""
		expSS = self._load(fname)

		if not _max_strain == 0.0:
			expSS = expSS[expSS[:, 0] <= _max_strain, :]
		if not _min_strain == 0.0:
			expSS = expSS[expSS[:, 0] >= _min_strain, :]

		np.savetxt("temp_expSS.csv", expSS, delimiter=",")
		return expSS
tell_max_strain(orient)

Convience method to forget how data is laid out

Source code in matmdl/core/experimental.py
def tell_max_strain(self, orient: str):
	"""Convience method to forget how data is laid out"""
	return self.data[orient]["max_strain"]

optimizer

Module for instantiating and updating the optimizer object.

InOpt

Stores information about the optimization input parameters.

Since the hardening parameters and orientation parameters are fundamentally different, this object stores information about both in such a way that they can still be access independently.

Parameters:

Name Type Description Default
orientations dict

Orientation information directly from opt_input.

required
params dict

name and bounds of parameters to be optimized.

required

Attributes:

Name Type Description
orients list

Nickname strings defining orientations, as given in :ref:orientations.

material_params list

Parameter names to be optimized, as in :ref:orientations.

material_bounds list

Tuple of floats defining bounds of parameter in the same index of self.params, again given in :ref:orientations.

orient_params list

Holds orientation parameters to be optimized, or single orientation parameters if not given as a tuple in :ref:orientations. These are labeled orientationNickName_deg for the degree value of the right hand rotation about the loading axis and orientationNickName_mag for the magnitude of the offset.

orient_bounds list

List of tuples corresponding to the bounds for the parameters stored in self.orient_params.

params list

Combined list consisting of both self.material_params and self.orient_params.

bounds list

Combined list consisting of both self.material_bounds and self.orient_bounds.

has_orient_opt dict

Dictionary with orientation nickname as key and boolean as value indicating whether slight loading offsets should be considered for that orientation.

fixed_vars dict

Dictionary with orientation nickname as key and any fixed orientation information (_deg or _mag) for that loading orientation that is not going to be optimized.

offsets list

List of dictionaries containing all information about the offset as given in the input file. Not used/called anymore?

num_params_material int

Number of material parameters to be optimized.

num_params_orient int

Number of orientation parameters to be optimized.

num_params_total int

Number of parameters to be optimized in total.

Note

Checks if orientations[orient]['offset']['deg_bounds'] in :ref:orientations is a tuple to determine whether orientation should also be optimized.

Source code in matmdl/core/optimizer.py
class InOpt:
	"""
	Stores information about the optimization input parameters.

	Since the hardening parameters and orientation parameters are fundamentally
	different, this object stores information about both in such a way that they
	can still be access independently.

	Args:
	    orientations (dict): Orientation information directly from ``opt_input``.
	    params (dict): name and bounds of parameters to be optimized.


	Attributes:
	    orients (list): Nickname strings defining orientations, as given
	        in :ref:`orientations`.
	    material_params (list): Parameter names to be optimized, as in :ref:`orientations`.
	    material_bounds (list): Tuple of floats defining bounds of parameter in the same
	        index of ``self.params``, again given in :ref:`orientations`.
	    orient_params (list): Holds orientation parameters to be optimized, or single
	        orientation parameters if not given as a tuple in :ref:`orientations`.
	        These are labeled ``orientationNickName_deg`` for the degree value of the
	        right hand rotation about the loading axis and ``orientationNickName_mag``
	        for the magnitude of the offset.
	    orient_bounds (list): List of tuples corresponding to the bounds for the parameters
	        stored in ``self.orient_params``.
	    params (list): Combined list consisting of both ``self.material_params`` and
	        ``self.orient_params``.
	    bounds (list): Combined list consisting of both ``self.material_bounds`` and
	        ``self.orient_bounds``.
	    has_orient_opt (dict): Dictionary with orientation nickname as key and boolean
	        as value indicating whether slight loading offsets should be considered
	        for that orientation.
	    fixed_vars (dict): Dictionary with orientation nickname as key and any fixed
	        orientation information (``_deg`` or ``_mag``) for that loading orientation
	        that is not going to be optimized.
	    offsets (list): List of dictionaries containing all information about the offset
	        as given in the input file. Not used/called anymore?
	    num_params_material (int): Number of material parameters to be optimized.
	    num_params_orient (int): Number of orientation parameters to be optimized.
	    num_params_total (int): Number of parameters to be optimized in total.

	Note:
	    Checks if ``orientations[orient]['offset']['deg_bounds']``
	    in :ref:`orientations` is a tuple to determine whether
	    orientation should also be optimized.
	"""

	# TODO: check if ``offsets`` attribute is still needed.
	def __init__(self, orientations, params):
		"""Sorted orientations here defines order for use in single list passed to optimizer."""
		self.orients = sorted(orientations.keys())
		(
			self.params,
			self.bounds,
			self.material_params,
			self.material_bounds,
			self.orient_params,
			self.orient_bounds,
		) = ([] for i in range(6))
		for param, bound in params.items():
			if type(bound) in (list, tuple):  # pass ranges to optimizer
				self.material_params.append(param)
				self.material_bounds.append([float(b) for b in bound])
			elif type(bound) in (float, int):  # write single values to file
				write_input_params(uset.param_file, param, float(bound))
			else:
				raise TypeError("Incorrect bound type in input file.")

		# add orientation offset info:
		self.offsets = []
		self.has_orient_opt = {}
		self.fixed_vars = {}
		for orient in self.orients:
			if "offset" in orientations[orient].keys():
				self.has_orient_opt[orient] = True
				self.offsets.append({orient: orientations[orient]["offset"]})
				# ^ saves all info (TODO: check if still needed)

				# deg rotation *about* loading orientation:
				if isinstance(orientations[orient]["offset"]["deg_bounds"], (tuple, list)):
					self.orient_params.append(orient + "_deg")
					self.orient_bounds.append(
						[float(f) for f in orientations[orient]["offset"]["deg_bounds"]]
					)
				else:
					self.fixed_vars[(orient + "_deg")] = orientations[orient]["offset"][
						"deg_bounds"
					]

				# mag rotation *away from* loading:
				if isinstance(orientations[orient]["offset"]["mag_bounds"], (tuple, list)):
					self.orient_params.append(orient + "_mag")
					self.orient_bounds.append(
						[float(f) for f in orientations[orient]["offset"]["mag_bounds"]]
					)
				else:
					self.fixed_vars[(orient + "_mag")] = orientations[orient]["offset"][
						"mag_bounds"
					]

			else:
				self.has_orient_opt[orient] = False

		# combine material and orient info into one ordered list:
		self.params = self.material_params + self.orient_params
		self.bounds = as_float_tuples(self.material_bounds + self.orient_bounds)

		# descriptive stats on input object:
		self.num_params_material = len(self.material_params)
		self.num_params_orient = len(self.orient_params)
		self.num_params_total = len(self.params)
__init__(orientations, params)

Sorted orientations here defines order for use in single list passed to optimizer.

Source code in matmdl/core/optimizer.py
def __init__(self, orientations, params):
	"""Sorted orientations here defines order for use in single list passed to optimizer."""
	self.orients = sorted(orientations.keys())
	(
		self.params,
		self.bounds,
		self.material_params,
		self.material_bounds,
		self.orient_params,
		self.orient_bounds,
	) = ([] for i in range(6))
	for param, bound in params.items():
		if type(bound) in (list, tuple):  # pass ranges to optimizer
			self.material_params.append(param)
			self.material_bounds.append([float(b) for b in bound])
		elif type(bound) in (float, int):  # write single values to file
			write_input_params(uset.param_file, param, float(bound))
		else:
			raise TypeError("Incorrect bound type in input file.")

	# add orientation offset info:
	self.offsets = []
	self.has_orient_opt = {}
	self.fixed_vars = {}
	for orient in self.orients:
		if "offset" in orientations[orient].keys():
			self.has_orient_opt[orient] = True
			self.offsets.append({orient: orientations[orient]["offset"]})
			# ^ saves all info (TODO: check if still needed)

			# deg rotation *about* loading orientation:
			if isinstance(orientations[orient]["offset"]["deg_bounds"], (tuple, list)):
				self.orient_params.append(orient + "_deg")
				self.orient_bounds.append(
					[float(f) for f in orientations[orient]["offset"]["deg_bounds"]]
				)
			else:
				self.fixed_vars[(orient + "_deg")] = orientations[orient]["offset"][
					"deg_bounds"
				]

			# mag rotation *away from* loading:
			if isinstance(orientations[orient]["offset"]["mag_bounds"], (tuple, list)):
				self.orient_params.append(orient + "_mag")
				self.orient_bounds.append(
					[float(f) for f in orientations[orient]["offset"]["mag_bounds"]]
				)
			else:
				self.fixed_vars[(orient + "_mag")] = orientations[orient]["offset"][
					"mag_bounds"
				]

		else:
			self.has_orient_opt[orient] = False

	# combine material and orient info into one ordered list:
	self.params = self.material_params + self.orient_params
	self.bounds = as_float_tuples(self.material_bounds + self.orient_bounds)

	# descriptive stats on input object:
	self.num_params_material = len(self.material_params)
	self.num_params_orient = len(self.orient_params)
	self.num_params_total = len(self.params)

get_next_param_set(opt, in_opt)

Give next parameter set to try using current optimizer state.

Allow to sample bounds exactly, round all else to reasonable precision.

Source code in matmdl/core/optimizer.py
def get_next_param_set(opt: object, in_opt: object) -> list[float]:
	"""
	Give next parameter set to try using current optimizer state.

	Allow to sample bounds exactly, round all else to reasonable precision.
	"""
	if len(state.next_params) < 1:
		raw_param_list = opt.ask(n_points=state.num_paramsets)
		for raw_params in raw_param_list:
			new_params = []
			for param, bound in zip(raw_params, in_opt.bounds):
				if param in bound:
					new_params.append(param)
				else:
					new_params.append(round_sig(param, sig=6))
			state.next_params.append(new_params)
	new_params = state.next_params.popleft()
	return new_params

instantiate(in_opt, uset)

Define all optimization settings, return optimizer object.

Parameters:

Name Type Description Default
in_opt object

Input settings defined in :class:InOpt.

required
uset

User settings from input file.

required

Returns:

Type Description
object

skopt.Optimize: Instantiated optimization object.

Source code in matmdl/core/optimizer.py
def instantiate(in_opt: object, uset: object) -> object:
	"""
	Define all optimization settings, return optimizer object.

	Args:
	    in_opt: Input settings defined in :class:`InOpt`.
	    uset : User settings from input file.

	Returns:
	    skopt.Optimize: Instantiated optimization object.
	"""
	opt = Optimizer(
		dimensions=in_opt.bounds,
		base_estimator="gp",
		n_initial_points=uset.n_initial_points,
		initial_point_generator="lhs",
		acq_func="EI",
		acq_func_kwargs={"xi": 1.0},  # default is 0.01, higher values favor exploration
	)
	return opt

load_previous(opt, search_local=False)

Load input files of previous optimizations to use as initial points in current optimization.

Looks for a file named out_progress.txt from which to load previous results. Requires access to global variable opt_progress that stores optimization output. The parameter bounds for the input files must be within current parameter bounds. Renumbers old/loaded results in opt_progress to have negative iteration numbers.

Parameters:

Name Type Description Default
opt object

Current instance of the optimizer object.

required
search_local bool

Look in the current directory for files (convenient for plotting from parallel instances).

False

Returns:

Type Description
object

skopt.Optimizer: Updated instance of the optimizer object.

Source code in matmdl/core/optimizer.py
def load_previous(opt: object, search_local: bool = False) -> object:
	"""
	Load input files of previous optimizations to use as initial points in current optimization.

	Looks for a file named ``out_progress.txt`` from which to load previous results.
	Requires access to global variable ``opt_progress`` that stores optimization output.
	The parameter bounds for the input files must be within current parameter bounds.
	Renumbers old/loaded results in ``opt_progress`` to have negative iteration numbers.

	Args:
	    opt: Current instance of the optimizer object.
	    search_local: Look in the current directory for files
	        (convenient for plotting from parallel instances).

	Returns:
	    skopt.Optimizer: Updated instance of the optimizer object.
	"""
	fname_params = "out_progress.txt"
	fname_errors = "out_errors.txt"

	if uset.main_path not in [os.getcwd(), "."] and not search_local:
		fname_params = os.path.join(uset.main_path, fname_params)
		fname_errors = os.path.join(uset.main_path, fname_errors)

	params = np.loadtxt(fname_params, skiprows=1, delimiter=",")
	errors = np.loadtxt(fname_errors, skiprows=1, delimiter=",")
	x_in = params[:, 1:].tolist()
	y_in = errors[:, -1].tolist()

	if __debug__:
		with open("out_debug.txt", "a+") as f:
			f.write("loading previous results\n")
			f.writelines([f"x_in: {x}\ty_in: {y}\n" for x, y in zip(x_in, y_in)])

	tic = time.time()
	log("Starting to reload previous data")
	opt.tell(x_in, y_in)
	log(f"Finished reloading previous data after {time.time()-tic:.2f} seconds.")
	return opt

update_if_needed(opt, in_params, in_errors)

Give params and errors to state, updating optimizer if needed.

Need is determined by state.num_paramsets, which is set by relative timing of the FEA and opt.tell() procedures.

in_params and state.last_params may contain duplicate updates from parallel instances, but these will be dealt with by opt.tell()

Source code in matmdl/core/optimizer.py
def update_if_needed(opt, in_params, in_errors):
	"""
	Give params and errors to state, updating optimizer if needed.

	Need is determined by state.num_paramsets, which is set by relative
	timing of the FEA and opt.tell() procedures.

	`in_params` and `state.last_params` may contain duplicate updates from
	parallel instances, but these will be dealt with by opt.tell()
	"""
	if len(state.next_params) < 1:
		# tell optimizer all accumulated params and errors and clear from state
		update_params = []
		update_errors = []
		# check for old data stored in state:
		for params, errors in zip(state.last_params, state.last_errors):
			update_params.append(params)
			update_errors.append(errors)
		# add current information from args:
		update_params.append(in_params[0])
		update_errors.append(in_errors[0])
		# tell opt and clear state:
		with state.TimeTell()():
			opt.tell(update_params, update_errors)
		state.last_params = []
		state.last_errors = []
	else:
		# tell state of the params and error value
		state.last_params.append(in_params[0])
		state.last_errors.append(in_errors[0])

parallel

Module for dealing with the present optimization being one of many simultaneous instances. This is presumed to be the case when the setting main_path has a value. Everything here should be called within a Checkout guard.

Checkout

Checkout shared resource without write collisions.

Source code in matmdl/core/parallel.py
class Checkout:
	"""Checkout shared resource without write collisions."""

	def __init__(self, fname, local=False):
		self.start = time.time()
		self.fname = fname
		if local:
			self.fpath = os.path.join(os.getcwd(), fname)
		else:
			self.source = os.getcwd()
			self.fpath = os.path.join(uset.main_path, fname)

	def __enter__(self):
		cutoff_seconds = 420

		while True and time.time() - self.start < cutoff_seconds:
			lockfile_exists = os.path.isfile(self.fpath + ".lck")
			if lockfile_exists:
				try:
					with open(self.fpath + ".lck", "r") as f:
						source = f.read()
					msg(
						f"Waiting on Checkout for {time.time()-self.start:.3f} seconds from {source}"
					)
				except FileNotFoundError:
					msg(f"Waiting on Checkout for {time.time()-self.start:.3f} seconds")
				time.sleep(1)
			else:
				with open(self.fpath + ".lck", "a+") as f:
					f.write(f"{os.getcwd()}\n")
				self.time_unlocked = time.time()
				# check for collisions
				time.sleep(0.010)  # allow potential collision cases to catch up
				try:
					with open(self.fpath + ".lck", "r") as f:
						lines = f.readlines()
				except FileNotFoundError:
					lines = []
				if len(lines) != 1:
					warn("Warning: collision detected between processes:", RuntimeWarning)
					for line in lines:
						print(f"\t{line}", flush=True)
					warn("Reattempting to checkout resource", RuntimeWarning)
					try:
						os.remove(self.fpath + ".lck")
					except FileNotFoundError:
						pass  # only one process will successfully remove file
					time.sleep(4.0 * random.random())  # wait for a sec before restarting
					self.__enter__()  # try again

				msg(f"Unlocked after {time.time()-self.start:.3f} seconds")
				break
		if time.time() - self.start > cutoff_seconds:
			raise RuntimeError(
				f"Error: waited for resource {self.fname} for longer than {cutoff_seconds}s, exiting."
			)

	def __exit__(self, exc_type, exc_value, exc_tb):
		if False:  # debugging
			with open(self.fpath + ".lck", "r") as f:
				source = f.read()
			print(f"Exit: rm lock from: {source}", flush=True)
		os.remove(self.fpath + ".lck")
		msg(f"Exiting Checkout after {time.time()-self.time_unlocked:.3f} seconds.")

	def __call__(self, fn):
		"""
		Decorator to use if whole function needs resource checked out.
		"""

		def decorator():
			with self:
				return fn()

		return decorator
__call__(fn)

Decorator to use if whole function needs resource checked out.

Source code in matmdl/core/parallel.py
def __call__(self, fn):
	"""
	Decorator to use if whole function needs resource checked out.
	"""

	def decorator():
		with self:
			return fn()

	return decorator

assert_db_lengths_match()

loads and checks lengths of all output files; used for debugging

Source code in matmdl/core/parallel.py
def assert_db_lengths_match():
	"""loads and checks lengths of all output files; used for debugging"""
	lengths = []
	for npyfile in [f for f in os.listdir(uset.main_path) if f.endswith("npy")]:
		dat = np.load(os.path.join(uset.main_path, npyfile))
		lengths.append(np.shape(dat)[2])
	for outfile in [
		f for f in os.listdir(uset.main_path) if f.startswith("out_") and f.endswith(".txt")
	]:
		dat = np.loadtxt(os.path.join(uset.main_path, outfile), delimiter=",", skiprows=1)
		lengths.append(np.shape(dat)[0])

	if len(set(lengths)) > 1:
		error_time = time.time_ns()
		with open(os.path.join(uset.main_path, "out_progress.txt"), "a+") as f:
			f.write(f"{error_time}, ERROR from {os.getcwd()}")
		raise RuntimeError(f"mismatch in DB lengths at time: {error_time}")

check_parallel()

Starts parallel initialization if needed.

Note

This copies files from uset.main_path but does not reload the input file.

Source code in matmdl/core/parallel.py
def check_parallel():
	"""
	Starts parallel initialization if needed.

	Note:
		This copies files from `uset.main_path` but does not reload the input file.
	"""
	if uset.main_path not in [os.getcwd(), "."]:
		msg("Starting as a parallel instance")
		copy_files(file_patterns)

copy_files(file_patterns)

copy files from uset.main_path to runner dir

Source code in matmdl/core/parallel.py
def copy_files(file_patterns):
	"""copy files from uset.main_path to runner dir"""

	# exact filenames
	flist = ["input.toml"]
	for orient in uset.orientations.keys():  # no need for ordering here
		flist.append(uset.orientations[orient]["exp"])
		try:
			flist.append(uset.orientations[orient]["inp"][0])
		except KeyError:
			# orientation generated, no input file needed
			pass

	# file list defined in engines:
	for f in os.listdir(uset.main_path):
		for pattern in file_patterns:
			if re.search(pattern, f):
				flist.append(f)

	# copy files to current directory
	for f in flist:
		copy(os.path.join(uset.main_path, f), os.getcwd())

update_parallel()

Update state if needed based on shared database timing information.

Returns:

Name Type Description
params list

parameter values (list) of unseen points to be updated

errors list

error values (scalar) of unseen points to be updated

Note

Also updates state.last_updated timing information.

Source code in matmdl/core/parallel.py
def update_parallel():
	"""
	Update state if needed based on shared database timing information.

	Returns:
		params (list): parameter values (list) of unseen points to be updated
		errors (list): error values (scalar) of unseen points to be updated

	Note:
		Also updates `state.last_updated` timing information.
	"""
	if uset.main_path in [os.getcwd(), "."]:
		return ([], [])

	num_newlines = _get_num_newlines()
	if num_newlines < 1:
		return ([], [])

	# update state:
	num_lines = _get_totlines()
	start_line = num_lines - num_newlines + 1
	update_params = np.loadtxt(
		os.path.join(uset.main_path, "out_progress.txt"),
		delimiter=",",
		skiprows=start_line,
	)
	update_errors = np.loadtxt(
		os.path.join(uset.main_path, "out_errors.txt"),
		delimiter=",",
		skiprows=start_line,
	)

	# strict output database assertion:
	# assert_db_lengths_match()

	# quick assertion for params and errors only:
	has_multiple = len(np.shape(update_params)) == 2
	len_params = np.shape(update_params)[0] if has_multiple else 1
	len_errors = np.shape(update_errors)[0] if has_multiple else 1
	# ^ (if shape is 1D then there is only one entry)
	assert (
		len_params == len_errors
	), f"Error: mismatch in output database size! Found {len_params} params and {len_errors} errors"

	update_params_pass = []
	update_errors_pass = []
	if has_multiple:
		for i in range(np.shape(update_params)[0]):
			update_params_pass.append(list(update_params[i, 1:]))  # first value is time
			update_errors_pass.append(float(update_errors[i, -1]))  # last value is mean
	else:
		update_params_pass.append(list(update_params[1:]))
		update_errors_pass.append(float(update_errors[-1]))

	state.update_read()
	return update_params_pass, update_errors_pass

parser

Module that loads and checks input file.

UserSettings

Load, check, and store input from the input file.

Note

Attributes must be written/deleted within an unlock context manager and should not be overwitten during the optimization since changing behavior makes the history harder to follow.

Source code in matmdl/core/parser.py
class UserSettings:
	"""
	Load, check, and store input from the input file.

	Note:
		Attributes must be written/deleted within an unlock context
		manager and should not be overwitten during the optimization
		since changing behavior makes the history harder to follow.
	"""

	class Option:
		"""Options that are commonly associated with each input."""

		def __init__(self, **kwargs):
			"""Defaults for option instances."""
			if "crit" in kwargs:
				self.crit = kwargs.get("crit")
			else:
				self.crit = True
			if "types" in kwargs:
				self.types = kwargs.get("types")
			else:
				self.types = []
			if "lower" in kwargs:
				self.lower = kwargs.get("lower")
			else:
				self.lower = None
			if "upper" in kwargs:
				self.upper = kwargs.get("upper")
			else:
				self.upper = None
			if "default" in kwargs:
				self.default = kwargs.get("default")

	input_reqs = {
		"run": {
			"loop_len": Option(types=[int], lower=2),
			"n_initial_points": Option(types=[int], lower=2),
			"large_error": Option(types=[int, float]),
			"param_file": Option(types=[str]),
			"length": Option(types=[int, float]),
			"area": Option(types=[int, float]),
			"jobname": Option(types=[str]),
			"recursion_depth": Option(types=[int]),
			"max_strain": Option(types=[int, float], crit=False, default=0.0),
			"min_strain": Option(types=[int, float], crit=False, default=0.0),
			"i_powerlaw": Option(types=[int]),
			"umat": Option(types=[str, bool], crit=False, default=False),
			"cpus": Option(types=[int]),
			"do_load_previous": Option(types=[bool, int]),
			"is_compression": Option(types=[bool]),
			"slope_weight": Option(types=[int, float], crit=False, default=0.4),
			"main_path": Option(types=[str], crit=False, default=os.getcwd()),
			"format": Option(types=[str], crit=False, default="huang"),
			"executable_path": Option(types=[str, bool], crit=False, default=False),
			"error_deviation_weight": Option(types=[float], crit=False, default=0.10, lower=0.0, upper=1.0),
			"do_single": Option(types=[bool], crit=False, default=False),
		},
		"plot": {
			"grain_size_name": Option(crit=False, types=[str]),
			"title": Option(crit=False, types=[str]),
			"param_additional_legend": Option(crit=False, types=[str]),
		},
	}

	def __init__(self, input_fname="input.toml"):
		categories = ["run", "plot"]
		with open(input_fname, "rb") as f:
			conf = tomllib.load(f)

		# write params:
		with self.unlock():
			self.params = conf["params"]
			if len(conf["orientations"]) > 0:
				self.orientations = {}
				for orient in conf["orientations"]:
					self.orientations[orient["name"]] = orient

			# get all input:
			for category in categories:
				for key, value in conf[category].items():
					if key not in self.input_reqs[category].keys():
						raise AttributeError(f"Unknown input: {key}")
					self.__dict__[key] = value

			# check if defaults needed:
			for category in categories:
				for key, value in self.input_reqs[category].items():
					if key not in self.__dict__:
						try:
							print(
								f"Input warning: input {key} not found, using default value of {value.default}"
							)
							self.__dict__[key] = value.default
						except AttributeError:
							raise AttributeError(f"\nInput: no default found for option {key}\n")

		# general checks:
		for key, req in self.input_reqs["run"].items():
			if key not in self.__dict__.keys():
				if req.crit is True:
					raise AttributeError(f"Missing critical input: {key}")
				else:
					continue
			value = self.__dict__[key]
			if req.types:
				input_type = type(value)
				if input_type not in req.types:
					raise AttributeError(f"Input type of {input_type} not one of {req.types}")
			if req.lower:
				if value < req.lower:
					raise ValueError(
						f"Input of {value} for `{key}` is below lower bound of {req.lower}"
					)
			if req.upper:
				value = self.__dict__[key]
				if value > req.upper:
					raise ValueError(
						f"Input of {value} for `{key}` is above upper bound of {req.upper}"
					)

		# check if this is a single run
		any_bounds = False
		for param_name, param_value in self.params.items():
			if type(param_value) in [list, tuple]:
				any_bounds = True
		with self.unlock():
			if not any_bounds:
				self.do_single = True
				log("Warning: parser: no bounded parameters in input file, running single.")
			else:
				self.do_single = False

		# individual checks:
		if self.i_powerlaw not in [0, 1]:
			raise NotImplementedError(f"No known option for i_powerlaw: {self.i_powerlaw}")
		if self.n_initial_points > self.loop_len:
			raise ValueError(
				f"Input initial points ({self.n_initial_points}) greater than total iterations ({self.loop_len})"
			)
		if self.format.lower() not in ["huang", "fepx"]:
			raise ValueError(
				f"Unexpected format option {self.format}; should be either huang or fepx."
			)
		# TODO add more individual checks if needed

	@contextmanager
	def unlock(self):
		self.is_locked = False
		try:
			yield self
		finally:
			self.is_locked = True

	def __setattr__(self, name, value):
		if name == "is_locked" or self.is_locked is False:
			super().__setattr__(name, value)
		else:
			raise AttributeError(self, "Don't touch my stuff")

	def __delattr__(self, name, value):
		if self.is_locked is False:
			super().__delattr__(name, value)
		else:
			raise AttributeError(self, "Don't wreck my stuff")
Option

Options that are commonly associated with each input.

Source code in matmdl/core/parser.py
class Option:
	"""Options that are commonly associated with each input."""

	def __init__(self, **kwargs):
		"""Defaults for option instances."""
		if "crit" in kwargs:
			self.crit = kwargs.get("crit")
		else:
			self.crit = True
		if "types" in kwargs:
			self.types = kwargs.get("types")
		else:
			self.types = []
		if "lower" in kwargs:
			self.lower = kwargs.get("lower")
		else:
			self.lower = None
		if "upper" in kwargs:
			self.upper = kwargs.get("upper")
		else:
			self.upper = None
		if "default" in kwargs:
			self.default = kwargs.get("default")
__init__(**kwargs)

Defaults for option instances.

Source code in matmdl/core/parser.py
def __init__(self, **kwargs):
	"""Defaults for option instances."""
	if "crit" in kwargs:
		self.crit = kwargs.get("crit")
	else:
		self.crit = True
	if "types" in kwargs:
		self.types = kwargs.get("types")
	else:
		self.types = []
	if "lower" in kwargs:
		self.lower = kwargs.get("lower")
	else:
		self.lower = None
	if "upper" in kwargs:
		self.upper = kwargs.get("upper")
	else:
		self.upper = None
	if "default" in kwargs:
		self.default = kwargs.get("default")

runner

Helper module for some abstracted commands used in run.

check_single()

rough copy of run/single_loop that does not use an optimizer object

Source code in matmdl/core/runner.py
def check_single():
	"""rough copy of run/single_loop that does not use an optimizer object"""
	if not uset.do_single:
		return
	print("DBG: starting single run!")

	# load options:
	in_opt = optimizer.InOpt(uset.orientations, uset.params)
	next_params = []
	exp_data = ExpData(uset.orientations)  # noqa: F841
	# above line to make main input files with correct strain magnitude

	# ck that there are no ranges in input
	for param_name, param_value in uset.params.items():
		if type(param_value) in [list, tuple]:
			raise TypeError(
				f"Expected prescribed parameters for single run; found parameter bounds for {param_name}"
			)

	engine.prepare()
	for orient in in_opt.orients:
		do_orientation_inputs(next_params, orient, in_opt)
		try:
			shutil.copy(f"{uset.jobname}_{orient}.inp", f"{uset.jobname}.inp")
		except FileNotFoundError:
			# per-orientation inputs not used, could be a single input run
			pass

		engine.run()
		if not engine.has_completed():
			print(f"DBG: refining orient {orient}")
			refine_run()
		if not engine.has_completed():
			print(f"DBG: not complete with {orient}, exiting...")
			sys.exit(1)
		else:
			output_fname = f"temp_time_disp_force_{orient}.csv"
			if os.path.isfile(output_fname):
				os.remove(output_fname)
			engine.extract(orient)  # extract data to temp_time_disp_force.csv
			if np.sum(np.loadtxt(output_fname, delimiter=",", skiprows=1)[:, 1:2]) == 0:
				print(f"Warning: incomplete run for {orient}, continuing...")
				return
	print("DBG: exiting single run!")
	sys.exit(0)

get_first(opt, in_opt, exp_data)

Run one simulation so its output dimensions can later inform the shape of output data.

Source code in matmdl/core/runner.py
def get_first(opt, in_opt, exp_data) -> None:
	"""
	Run one simulation so its output dimensions can later inform the shape of output data.
	"""
	# test with strain of 0.2%
	engine.write_strain(uset.length * 0.002, f"{uset.jobname}.inp")
	with state.TimeRun()():
		engine.run()
	if not engine.has_completed():
		refine_run()
	engine.extract("initial")
	# reset to first max_strain; if multiple samples, will be overwritten anyway
	first_sample = list(exp_data.data.keys())[0]
	engine.write_strain(exp_data.data[first_sample]["max_strain"], f"{uset.jobname}.inp")

refine_run(ct=0)

Restart simulation with smaller maximum increment size.

Cut max increment size by factor (hardcoded), possibly multiple times up to uset.recursion_depth or until Abaqus finished successfully. After eventual success or failure, rewrites original input file so that the next run starts with the initial, large maximum increment. Recursive calls tracked through ct parameter.

Parameters:

Name Type Description Default
ct int

Number of times this function has already been called. Starts at 0 and can go up to uset.recursion_depth.

0
Source code in matmdl/core/runner.py
def refine_run(ct: int = 0):
	"""
	Restart simulation with smaller maximum increment size.

	Cut max increment size by ``factor`` (hardcoded), possibly multiple
	times up to ``uset.recursion_depth`` or until Abaqus finished successfully.
	After eventual success or failure, rewrites original input file so that the
	next run starts with the initial, large maximum increment.
	Recursive calls tracked through ``ct`` parameter.

	Args:
	    ct: Number of times this function has already been called. Starts
	        at 0 and can go up to ``uset.recursion_depth``.
	"""
	if uset.format == "fepx":
		# TODO should separate out all engine-specific calls
		# and raise NotImplemented errors from there if applicable
		return
	factor = 5.0
	ct += 1
	# remove old lock file from previous unfinished simulation
	subprocess.run("rm *.lck", shell=True)
	filename = uset.jobname + ".inp"
	tempfile = "temp_input.txt"
	with open(filename, "r") as f:
		lines = f.readlines()

	# exit strategy:
	if ct == 1:  # need to save original parameters outside of this recursive function
		with open(tempfile, "w") as f:
			f.writelines(lines)

	def write_original(filename):
		with open(tempfile, "r") as f:
			lines = f.readlines()
		with open(filename, "w") as f:
			f.writelines(lines)

	# find line after step line:
	step_line_ind = [i for i, line in enumerate(lines) if line.lower().startswith("*static")][0] + 1
	step_line = [number.strip() for number in lines[step_line_ind].strip().split(",")]
	original_increment = float(step_line[-1])

	# use original / factor:
	new_step_line = step_line[:-1] + ["%.4E" % (original_increment / factor)]
	new_step_line_str = str(new_step_line[0])
	for i in range(1, len(new_step_line)):
		new_step_line_str = new_step_line_str + ", "
		new_step_line_str = new_step_line_str + str(new_step_line[i])
	new_step_line_str = new_step_line_str + "\n"
	with open(filename, "w") as f:
		f.writelines(lines[:step_line_ind])
		f.writelines(new_step_line_str)
		f.writelines(lines[step_line_ind + 1 :])
	engine.run()
	if engine.has_completed():
		write_original(filename)
		return
	elif ct >= uset.recursion_depth:
		write_original(filename)
		return
	else:
		refine_run(ct)

remove_out_files()

Delete files from previous optimization runs if not reloading results.

Source code in matmdl/core/runner.py
def remove_out_files():
	"""Delete files from previous optimization runs if not reloading results."""
	if not uset.do_load_previous:
		out_files = [
			f
			for f in os.listdir(os.getcwd())
			if (f.startswith("out_") or f.startswith("res_") or f.startswith("temp_"))
		]
		if len(out_files) > 0:
			for f in out_files:
				os.remove(f)
	job_files = [
		f
		for f in os.listdir(os.getcwd())
		if (f.startswith(uset.jobname)) and not (f.endswith(".inp"))
	]
	for f in job_files:
		if os.path.isdir(f):
			os.rmdir(f)
		else:
			os.remove(f)

state

Module for optimization state meta-indicators, like iteration number and time.

State

Contains and updates iteration and timing for each optimization process.

Attributes:

Name Type Description
iterations int

number of iterations performed by this process

last_updated int

time in unix nanoseconds of the last update to the optimizer state from any process

tell_time float

duration of time in seconds for the opt.tell process

run_time float

duration of time in seconds for a single iteration of the run process

Note

Warns when tell_time > run_time but does not change behavior.

Source code in matmdl/core/state.py
class State:
	"""
	Contains and updates iteration and timing for each optimization process.

	Attributes:
	    iterations (int): number of iterations performed by this process
	    last_updated (int): time in unix nanoseconds of the last update to the
	        optimizer state from any process
	    tell_time (float): duration of time in seconds for the opt.tell process
	    run_time (float): duration of time in seconds for a single iteration of the
	        run process

	Note:
	    Warns when `tell_time` > `run_time` but does not change behavior.
	"""

	def __init__(self):
		self.iterations = 0
		self.last_updated = time.time_ns()
		self.tell_time = 0.0
		self.run_time = 0.0
		self.next_params = deque()
		self.last_params = []
		self.last_errors = []
		self.num_paramsets = 1

	def update_write(self):
		self.iterations += 1
		self.last_updated = time.time_ns()

	def update_read(self):
		self.last_updated = time.time_ns()

	def TimeRun(self):
		class TimeRun:
			def __enter__(innerself):
				innerself.tic = time.time()

			def __exit__(innerself, exc_type, exc_value, exc_tb):
				self.run_time = time.time() - innerself.tic

		return TimeRun

	def TimeTell(self):
		class TimeTell:
			def __enter__(innerself):
				innerself.tic = time.time()

			def __exit__(innerself, exc_type, exc_value, exc_tb):
				new_time_tell = time.time() - innerself.tic
				if new_time_tell > self.run_time:
					warn(
						f"Taking longer to tell than to run: {new_time_tell:.1f} vs {self.run_time:.1f} seconds. Incrementing sequence length from {self.num_paramsets}.",
						RuntimeWarning,
					)
					self.num_paramsets += 1
				self.tell_time = new_time_tell

		return TimeTell

utilities

Utility functions that fit nowhere else. These should have no matmdl dependencies.

as_float_tuples(list_of_tuples)

Make sure tuples contain only floats.

Take list of tuples that may include ints and return list of tuples containing only floats. Useful for optimizer param bounds since type of input determines type of param guesses. Skips non-tuple items in list.

Parameters:

Name Type Description Default
list_of_tuples list[tuple[Union[int, float]]]

Tuples in this list may contain a mix of floats and ints.

required

Returns: The same list of tuples containing only floats.

Source code in matmdl/core/utilities.py
def as_float_tuples(
	list_of_tuples: list[tuple[Union[int, float]]],
) -> list[tuple[float]]:
	"""
	Make sure tuples contain only floats.

	Take list of tuples that may include ints and return list of tuples containing only floats. Useful for optimizer param bounds since type of input determines type of param guesses. Skips non-tuple items in list.

	Args:
	    list_of_tuples: Tuples in this list may contain a mix of
	        floats and ints.
	Returns:
	    The same list of tuples containing only floats.

	"""
	new_list = []
	prec = 10  # decimal places in scientific notation

	def sigfig(val):
		return float(("%." + str(prec) + "e") % val)

	for old_item in list_of_tuples:
		if isinstance(old_item, tuple):
			new_item = tuple(map(sigfig, old_item))
		else:
			new_item = old_item
		new_list.append(new_item)
	return new_list

log(message)

append message to local log file with time stamp

Source code in matmdl/core/utilities.py
def log(message: str):
	"""append message to local log file with time stamp"""
	with open("out_log.txt", "a+") as f:
		f.write(f"{time.time():.2f}: {message}\n")

msg(message)

broadcast message to stdout if not run with -0

Source code in matmdl/core/utilities.py
def msg(message: str):
	"""broadcast message to stdout if not run with -0"""
	if __debug__:
		print(message, flush=True)

unit_vector(vector)

Gives a normalized vector using numpy.linalg.norm.

Source code in matmdl/core/utilities.py
def unit_vector(vector: "vector") -> "vector":
	"""Gives a normalized vector using ``numpy.linalg.norm``."""
	return vector / norm(vector)

warn(message, warn_type=UserWarning)

Raise warning with consistent formatting

Source code in matmdl/core/utilities.py
def warn(message: str, warn_type=UserWarning):
	"""Raise warning with consistent formatting"""
	warnings.formatwarning = (
		lambda msg, warn_type, *args, **kwargs: f"{warn_type.__name__}: {msg}\n"
	)
	warnings.warn(message, warn_type)

writer

Module for writing to files.

combine_SS(zeros, orientation)

Reads npy stress-strain output and appends current results.

Loads from temp_time_disp_force_{orientation}.csv and writes to out_time_disp_force_{orientation}.npy. Should only be called after all orientations have run, since zeros==True if any one fails.

For parallel, needs to be called within a parallel.Checkout guard.

Parameters:

Name Type Description Default
zeros bool

True if the run failed and a sheet of zeros should be written in place of real time-force-displacement data.

required
orientation str

Orientation nickname to keep temporary output files separate.

required
Source code in matmdl/core/writer.py
def combine_SS(zeros: bool, orientation: str) -> None:
	"""
	Reads npy stress-strain output and appends current results.

	Loads from ``temp_time_disp_force_{orientation}.csv`` and writes to
	``out_time_disp_force_{orientation}.npy``. Should only be called after all
	orientations have run, since ``zeros==True`` if any one fails.

	For parallel, needs to be called within a parallel.Checkout guard.

	Args:
	    zeros: True if the run failed and a sheet of zeros should be written
	        in place of real time-force-displacement data.
	    orientation: Orientation nickname to keep temporary output files separate.
	"""
	filename = os.path.join(uset.main_path, f"out_time_disp_force_{orientation}.npy")
	sheet = np.loadtxt(f"temp_time_disp_force_{orientation}.csv", delimiter=",", skiprows=1)
	if zeros:
		sheet = np.zeros((np.shape(sheet)))
	if os.path.isfile(filename):
		dat = np.load(filename)
		dat = np.dstack((dat, sheet))
	else:
		dat = sheet
	np.save(filename, dat)

write_error_to_file(error_list, orient_list, combination_function)

Write error values separated by orientation, if applicable.

Parameters:

Name Type Description Default
error_list list[float]

List of floats indicated error values for each orientation in orient_list, with which this list shares an order.

required
orient_list list[str]

List of strings holding orientation nicknames.

required
Source code in matmdl/core/writer.py
def write_error_to_file(
	error_list: list[float], orient_list: list[str], combination_function
) -> None:
	"""
	Write error values separated by orientation, if applicable.

	Args:
	    error_list: List of floats indicated error values for each orientation
	        in ``orient_list``, with which this list shares an order.
	    orient_list: List of strings holding orientation nicknames.
	"""
	error_fpath = os.path.join(uset.main_path, "out_errors.txt")
	if not os.path.isfile(error_fpath):
		with open(error_fpath, "w+") as f:
			f.write(f"# errors for {orient_list} and combined error\n")

	with open(error_fpath, "a+") as f:
		f.write(
			",".join([f"{err:.8e}" for err in error_list + [combination_function(error_list)]])
			+ "\n"
		)

write_input_params(fname, param_names, param_values, debug=False)

Write parameter values to file with = as separator.

Used for material and orientation input files.

Parameters:

Name Type Description Default
fname str

Name of file in which to look for parameters.

required
param_names Union[list[str], str]

List of strings (or single string) describing parameter names. Shares order with param_values.

required
param_values Union[list[float], float]

List of parameter values (or single value) to be written. Shares order with param_names.

required
Note

Finds first match in file, so put derived parameters at end of file.

Source code in matmdl/core/writer.py
def write_input_params(
	fname: str,
	param_names: Union[list[str], str],
	param_values: Union[list[float], float],
	debug=False,
) -> None:
	"""
	Write parameter values to file with ``=`` as separator.

	Used for material and orientation input files.

	Args:
	    fname: Name of file in which to look for parameters.
	    param_names: List of strings (or single string) describing parameter names.
	        Shares order with ``param_values``.
	    param_values: List of parameter values (or single value) to be written.
	        Shares order with ``param_names``.

	Note:
	    Finds first match in file, so put derived parameters at end of file.
	"""
	match uset.format:
		case "huang":
			separator = " = "
		case "fepx":
			separator = " "
	if type(param_names) not in (list, tuple) and type(param_values) not in (
		list,
		tuple,
	):
		param_names = [param_names]
		param_values = [param_values]
	elif len(param_names) != len(param_values):
		raise IndexError("Length of names must match length of values.")

	with open(fname, "r") as f1:
		lines = f1.readlines()

	newlines = lines.copy()
	for param_name, param_value in zip(param_names, param_values):
		for i, line in enumerate(lines):
			match = re.search(r"\b" + param_name + r"[ |=]", line)
			if match:
				newlines[i] = (
					line[0 : match.start()] + param_name + separator + str(param_value) + "\n"
				)
				break  # go on to next param set

	with open("temp_" + fname, "w+") as f2:
		f2.writelines(newlines)

	if not debug:
		os.remove(fname)
		os.rename("temp_" + fname, fname)

write_params_to_file(param_values, param_names)

Appends last iteration params to file out_progress.txt.

Source code in matmdl/core/writer.py
def write_params_to_file(param_values: list[float], param_names: list[str]) -> None:
	"""Appends last iteration params to file `out_progress.txt`."""

	opt_progress_header = ["time_ns"] + param_names
	out_fpath = os.path.join(uset.main_path, "out_progress.txt")

	add_header = not os.path.isfile(out_fpath)
	with open(out_fpath, "a+") as f:
		if add_header:
			header_padded = [opt_progress_header[0] + 12 * " "]
			for col_name in opt_progress_header[1:]:
				num_spaces = 8 + 6 - len(col_name)
				# 8 decimals, 6 other digits
				header_padded.append(col_name + num_spaces * " ")
			f.write(", ".join(header_padded) + "\n")
		line_string = ", ".join([f"{a:.8e}" for a in param_values]) + "\n"
		state.update_write()
		line_string = str(state.last_updated) + ", " + line_string
		f.write(line_string)