Source code for utils.UtilsForceOpt

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 28 12:54:25 2021

@author: hwang
"""

import mujoco
from myoconverter.optimization.utils.UtilsMujoco import getMuscleForceLengthCurvesSim, updateMuscleForceProperties

import numpy as np
from numpy import pi, sqrt

from loguru import logger
import multiprocessing as mp
from typing import List

[docs]def initializeWorker(shared_model) -> None: """ Initialize the worker process. INPUTS: shared_model the mujoco model """ global mjc_model mjc_model = shared_model
@logger.catch
[docs]def objFMMuscle(x, osim_fm, muscle: str, joints: List[str], jnt_arr, act_arr): """ Calculate the muscle force differences between osim and mjc models. INPUTS: x: vector optimizing parameters osim_fm: vector/mat muscle force vector/matrix of a given muscle muscle: string muscle name joints: list of string a list of unique coordinate that affecting the muscle length jnt_arr: list a list of joint angle values for the above unique coordinates act_arr: list a list of muscle activation values OUTPUTS: rms_fm: double the RMS value of muscle force map differences """ global mjc_model mjc_model = updateMuscleForceProperties(mjc_model, muscle, x) mjc_fm, length_mtu = getMuscleForceLengthCurvesSim(mjc_model, muscle, joints, jnt_arr, act_arr) # calculate moment arm differences, osim and mjc has opposite sign in MA return np.sqrt(np.sum((np.array(osim_fm).flatten() + np.array(mjc_fm).flatten())**2)/len(np.array(mjc_fm).flatten()))
@logger.catch
[docs]def getMuscleForceDiff(mjc_model, muscles, joints, jnt_arr, act_arr, osim_fp_muscle_joints): ''' Check if the muscle force differences between the osim and mjc models are beyond thresholds ''' err_ind = [] force_mtu, length_mtu = getMuscleForceLengthCurvesSim(mjc_model, muscles, joints, jnt_arr, act_arr) # change ndarray to array osim_fp_muscles_joints_array = np.array(osim_fp_muscle_joints).flatten() mjc_fp_muscles_joints_array = np.array(force_mtu).flatten() if len(osim_fp_muscles_joints_array) != len(mjc_fp_muscles_joints_array): logger.debug("osim and mjc models have different moment arm sizes") raise('osim and mjc models have different moment arm sizes') obj_org = np.sqrt(np.sum((osim_fp_muscles_joints_array + mjc_fp_muscles_joints_array)**2)/len(mjc_fp_muscles_joints_array)) # absolute and relative errors abs_err = abs(osim_fp_muscles_joints_array + mjc_fp_muscles_joints_array) osim_fp_muscles_joints_array_nonzeros = np.maximum(abs(osim_fp_muscles_joints_array), 1e-3) rel_err = abs((osim_fp_muscles_joints_array + mjc_fp_muscles_joints_array)/(osim_fp_muscles_joints_array_nonzeros)) # check if absolulte error larger than 0.001 and relative error larger than 5% err_ind.append(list(set(np.where(abs_err > 0.001)[0]) & set(np.where(rel_err > 0.05)[0]))) return err_ind, length_mtu, obj_org
# optimize muscle forces using the self-developed PSO Optimizer @logger.catch
[docs]def fmOptPSO_cust(mjc_model_path, muscle, joints, jnt_arr, act_arr,\ osim_fm, optParam_lb, optParam_ub, cost_org, speedy = False): # make sure the boundaries are array if type(optParam_lb) != np.ndarray: optParam_lb = np.array(optParam_lb) if type(optParam_ub) != np.ndarray: optParam_ub = np.array(optParam_ub) # PSO options c1 = 0.3 # inherit rate from the best of the current particle itself c2 = 0.25 # inherit rate from the global best particle w = 0.25 # inherit rate from the previous velocity dimensions = len(optParam_lb) # optimizing parameter demensions if speedy: n_particles = 5*dimensions iteration_max = 25 break_threshold = 0.25 # when more than #% particles have the same values, then stop the optimization, else: n_particles = 20*dimensions # particle number is set to be 20 times the parameter demensions iteration_max = 50 break_threshold = 0.5 # initilize the optimization variables randomly inside the bounds x = optParam_lb + np.random.uniform(low = 0, high = 1,\ size = (n_particles, dimensions))*(optParam_ub - optParam_lb) # initilize the velocities randomly inside the bounds v = (optParam_lb - optParam_ub) + np.random.uniform(low = 0, high = 1,\ size = (n_particles, dimensions))*(optParam_ub - optParam_lb)*2 # the best cost function for each particle is initilized as 0 obj_b = np.zeros(n_particles) p = x # assign the particle values obj_g = [] # initilize an empty global cost function value obj_g_old = [] # old cost function value obj_g_iter = 0 # number of iterations that contain the same obj_g itera = 0 # interation starts similar_particles = 0 # number of similar particles, this will be used as an cretiria to stop the optimization shared_model = mujoco.MjModel.from_xml_path(mjc_model_path) with mp.Pool(initializer=initializeWorker, initargs=(shared_model,)) as pool: while itera < iteration_max: # Apply parallel computing using multiprocessing # Right now, the mujoco sim cannot be pickled and transfer to objective function # To solve this, mujoco file name need to transfer to objective function and load over there. # This will change the entire strucutre of the optimization process, not a day of work. # prepare function inputs x_input = [] for ix in x: x_input.append((ix, osim_fm, muscle, joints, jnt_arr, act_arr)) obj_list = pool.starmap(objFMMuscle, x_input) # update the local and global optimal if itera == 0: obj_b = obj_list obj_g = min(obj_list) g = x[obj_list.index(obj_g)] else: for iobj, obj in enumerate(obj_list): if obj < obj_b[iobj]: obj_b[iobj] = obj p[iobj] = x[iobj] if obj < obj_g: obj_g = obj g = x[iobj] # two random values to increase intersection between particles r1 = np.random.rand(1) r2 = np.random.rand(1) v = w*v + c1*r1*(p - x) + c2*r2*(g - x) # calculate velocities x = x + v # change to next iteration positions # make sure they are within boundaries for i, ix in enumerate(x): x[i] = np.minimum(np.maximum(ix, optParam_lb), optParam_ub) logger.info(f" PSO iteration: {itera} ; {round(similar_particles*100/n_particles)} percentage similarities; Best obj: {np.round(obj_g, 5)}") itera = itera + 1 # increase the iteration number similar_particles = len(np.where((obj_b - min(obj_b))/min(obj_b) < 0.01)[0]) # break if certain percentage of particles have similar objective values if similar_particles > break_threshold*n_particles: logger.info(" Break the optimization, since certain number of similar particles reached") break # break if the obj_g is the same value for more than 10 iterations if obj_g == obj_g_old: obj_g_iter = obj_g_iter + 1 else: obj_g_iter = 0 obj_g_old = obj_g if obj_g_iter > 10: logger.info(" Break the optimization, since global obj maintained the same value for certain iterations") break # update mjc model with new muscle parameters shared_model = updateMuscleForceProperties(shared_model, muscle, g) # save optimized results opt_results = {"cost_org": cost_org, "cost_opt": obj_g, "res_opt": g} return opt_results, shared_model