#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Jul 28 12:54:25 2021
@author: hwang
"""
import mujoco
from myoconverter.optimization.utils.UtilsMujoco import updateWrapSites, computeMomentArmMuscleJoints,\
getMuscleForceLengthCurvesSim, updateMuscleForceProperties
import numpy as np
from numpy import pi, sqrt
from loguru import logger
import multiprocessing as mp
[docs]def initializeWorker(shared_model) -> None:
"""
Initialize the worker process.
INPUTS:
shared_model
the mujoco model
"""
global mjc_model
mjc_model = shared_model
# @logger.catch
[docs]def objMAMuscle(x, side_id, wrap_type, wrap_id, pos_wrap, size_wrap, rotation_wrap,\
osim_ma, muscle, joints, joint_ranges, evalN):
""" this objective function calculate the difference of moment arms between opensim model and mujoco model.
Inputs:
x: vector
optimization vectors
side_id: int
the id list of the optimization sides
wrap_type: string
the type defined in O2MStep2
wrap_id: int
wrapping object id
pos_wrap: vector
the position of the wrapping object
size_wrap: vector
the size of the wrapping object
rotation_wrap: vector
the rotation vector of the wrapping object
osim_ma: vector
the moment arm vector of the osim model
muscle: string
muscle name
joints: list of string
the coordinates that the moment arms are caclulated
angle_ranges: list
the motion ranges of the mentioned coordinates
evalN: int
the number of evaluation points for the moment arm calculation
Outputs:
rms_ma: root mean square of the moment arm difference.
rms_x: root mean square of the optimizing parameters.
"""
global mjc_model
if type(joints) != list: # check if the joints are in a list or not
joints = [joints]
rms_ma = 0 # initialize the rms_ma
# update mjc model with new attaching_list and wrapping_list
mjcModel = updateWrapSites(mjc_model, wrap_type, wrap_id, pos_wrap, size_wrap, rotation_wrap, side_id, x)
# calculate moment arm
mjc_ma = computeMomentArmMuscleJoints(mjcModel, muscle, joints, joint_ranges, evalN)
# check the list length
if len(osim_ma) != len(mjc_ma):
logger.debug("osim and mjc models have different number wrapping coordinates of this wrapping object")
raise('osim and mjc models have different number wrapping coordinates of this wrapping object')
rms_ma = 0
if len(np.array(osim_ma).flatten()) != len(np.array(mjc_ma).flatten()):
logger.debug("osim and mjc models have different evalN numbers")
raise('osim and mjc models have different evalN numbers\n')
# calculate moment arm differences
# osim and mjc has opposite sign in MA, so 'add' is used ...
# also change the value from m to cm to have better understanding...
rms_ma = np.sqrt(np.sum((np.array(osim_ma).flatten() + np.array(mjc_ma).flatten())**2)/len(np.array(mjc_ma).flatten()))
return rms_ma
@logger.catch
[docs]def getMomentArmDiff(mjc_model, muscle, joints, jnt_motion_range, osim_ma_joints, evalN):
'''
Check if the moment arm differences between the osim and mjc models are beyond
thresholds
Only works for one muscle at multiple joints.
'''
err_ind = []
mjc_ma_joints = []
# joints should always be a list, even only one joint exist
if type(joints) != list:
joints = [joints]
# get moment arms from the mjc model
mjc_ma_joints = np.squeeze(computeMomentArmMuscleJoints(mjc_model,\
muscle, joints,\
jnt_motion_range, evalN))
# check the list length
if len(osim_ma_joints) != len(mjc_ma_joints):
logger.debug("osim and mjc models have different number of muscles of this wrapping object")
raise RuntimeError('osim and mjc models have different number of muscles of this wrapping object')
obj_org = 0
# calculate differences
osim_ma_joints_array = (np.array(osim_ma_joints)).flatten()
mjc_ma__joints_array = (np.array(mjc_ma_joints)).flatten()
obj_org = obj_org + np.sqrt(np.sum(( osim_ma_joints_array + mjc_ma__joints_array )**2)/len(osim_ma_joints_array))
# absolute and relative errors
abs_err = abs(osim_ma_joints_array + mjc_ma__joints_array)
osim_ma_joints_array_nonzeros = np.maximum(abs(osim_ma_joints_array), 1e-3)
rel_err = abs((osim_ma_joints_array + mjc_ma__joints_array)/(osim_ma_joints_array_nonzeros))
# check if absolulte error larger than 0.001 and relative error larger than 5%
err_ind = (list(set(np.where(abs_err > 0.001)[0]) & set(np.where(rel_err > 0.05)[0])))
return err_ind, obj_org
# optimize wrapping side using the self-developed PSO Optimizer
# @logger.catch
[docs]def maOptPSO_cust(mjc_model_path, muscle, joints, joint_ranges, side_id, wrap_type, wrap_id, pos_wrap, size_wrap,\
rotation_wrap, osim_ma, evalN, optParam_lb, optParam_ub, cost_org, speedy = False):
"""
This is a self-defined particle swarm optimizer (PSO) for the moment arm optimization. The purpurse of define
it here, instead of using other packages, is to gain custmizied feature control.
This is a temporary solution, eventually, these features should be added to the exist PSO python packages and
use them as the optimizor
INPUTS:
mjcModel: mujoco model
muscle: string
muscle name
joints: list
a list of coordinate names for the optimization
joint_ranges: list
a list of vectors that defines the motion ranges of the above coordinates
side_id: int
the id list of the optimization sides
wrap_type: string
the type defined in O2MStep2
wrap_id: int
wrapping object id
pos_wrap: vector
the position of the wrapping object
size_wrap: vector
the size of the wrapping object
rotation_wrap: vector
the rotation vector of the wrapping object
osim_ma: vector
the moment arm vector of the osim model
evalN: int
the number of evaluation points for moment arm opt
opt_param_lb: array
the lower bound of the optimized parameters
opt_param_ub: array
the upper bound of the optimized parameters
cost_org: double
the cost function value before the optimization
speedy: boolean
If true, reduce the particle size and iteration number
OUTPUTS:
res_side: optimization side locations
mjcModel: updated mujoco models
"""
# make sure the boundaries are array
if type(optParam_lb) != np.ndarray:
optParam_lb = np.array(optParam_lb)
if type(optParam_ub) != np.ndarray:
optParam_ub = np.array(optParam_ub)
# PSO options
c1 = 0.3 # inherit rate from the best of the current particle itself
c2 = 0.3 # inherit rate from the global best particle
w = 0.4 # inherit rate from the previous velocity
dimensions = len(optParam_lb) # optimizing parameter demensions
if speedy:
n_particles = 10
iteration_max = 25
break_threshold = 0.25 # when more than #% particles have the same values, then stop the optimization,
else:
n_particles = 10*dimensions # particle number is set to be 10 times the parameter demensions
iteration_max = 50
break_threshold = 0.5
# initilize the optimization variables randomly inside the bounds
x = optParam_lb + np.random.uniform(low = 0, high = 1,\
size = (n_particles, dimensions))*(optParam_ub - optParam_lb)
# initilize the velocities randomly inside the bounds
v = (optParam_lb - optParam_ub) + np.random.uniform(low = 0, high = 1,\
size = (n_particles, dimensions))*(optParam_ub - optParam_lb)*2
# the best cost function for each particle is initilized as 0
obj_b = np.zeros(n_particles)
p = x # assign the particle values
obj_g = [] # initilize an empty global cost function value
obj_g_old = [] # old cost function value
obj_g_iter = 0 # number of iterations that contain the same obj_g
itera = 0 # interation starts
similar_particles = 0 # number of similar particles, this will be used as an cretiria to stop the optimization
shared_model = mujoco.MjModel.from_xml_path(mjc_model_path)
with mp.Pool(initializer=initializeWorker, initargs=(shared_model,)) as pool:
while itera < iteration_max: # define the maximum iteration value
# Apply parallel computing using multiprocessing
# Right now, the mujoco sim cannot be pickled and transfer to objective function
# To solve this, mujoco file name need to transfer to objective function and load over there.
# prepare function inputs
x_input = []
for ix in x:
x_input.append((ix, side_id, wrap_type, \
wrap_id, pos_wrap, size_wrap,\
rotation_wrap, osim_ma, \
muscle, joints, joint_ranges, evalN))
obj_list = pool.starmap(objMAMuscle, x_input)
# update the local and global optimal
if itera == 0:
obj_b = obj_list
obj_g = min(obj_list)
obj_g_old = obj_g
g = x[obj_list.index(obj_g)]
else:
for iobj, obj in enumerate(obj_list):
if obj < obj_b[iobj]:
obj_b[iobj] = obj
p[iobj] = x[iobj]
if obj < obj_g:
obj_g = obj
g = x[iobj]
logger.info(f" PSO iteration: {itera} ; {round(similar_particles*100/n_particles)} percentage similarities; Best obj: {np.round(obj_g, 5)}")
# two random values to increase intersection between particles
r1 = np.random.rand(1)
r2 = np.random.rand(1)
v = w*v + c1*r1*(p - x) + c2*r2*(g - x) # calculate velocities
x = x + v # change to next iteration positions
# make sure they are within boundaries
for i, ix in enumerate(x):
x[i] = np.minimum(np.maximum(ix, optParam_lb), optParam_ub)
itera = itera + 1 # increase the iteration number
similar_particles = len(np.where((obj_b - min(obj_b))/min(obj_b) < 0.01)[0])
# break if certain percentage of particles have similar objective values
if similar_particles > break_threshold*n_particles:
logger.info(" Break the optimization, since certain number of similar particles reached")
break
# break if the obj_g is the same value for more than 10 iterations
if obj_g == obj_g_old:
obj_g_iter = obj_g_iter + 1
else:
obj_g_iter = 0
obj_g_old = obj_g
if obj_g_iter > 10:
logger.info(" Break the optimization, since global obj maintained the same value for certain iterations")
break
# update mjc model with new attaching_list and wrapping_list
shared_model = updateWrapSites(shared_model, wrap_type, wrap_id, pos_wrap, size_wrap, rotation_wrap, side_id, g)
# save optimized results
res_site = {"wrap_type": wrap_type, "cost_org": cost_org, "cost_opt": obj_g, "res_opt": g}
return res_site, shared_model