# Copyright 2021 DeepMind Technologies Limited. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS-IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Constants and general tooling for TCV plant.""" import collections from typing import Sequence, Text from dm_env import specs import numpy as np from fusion_tcv import named_array DT = 1e-4 # ie 10kHz # Below are general input/output specifications used for controllers that are # run on hardware. This interface corresponds to the so called "KH hybrid" # controller specification that is used in various experiments by EPFL. Hence, # this interface definition contains measurements and actions not used in our # tasks. # Number of actions the environment is exposing. Includes dummy (FAST) action. NUM_ACTIONS = 20 # Number of actuated coils in sim (without the dummy action coil). NUM_COILS_ACTUATED = 19 # Current and voltage limits by coil type # Note this are the limits used in the environments and are different # from the 'machine engineering limits' (as used/exposed by FGE). # We apply a safety factor (=< 1.0) to the engineering limits. CURRENT_SAFETY_FACTOR = 0.8 ENV_COIL_MAX_CURRENTS = collections.OrderedDict( E=7500*CURRENT_SAFETY_FACTOR, F=7500*CURRENT_SAFETY_FACTOR, OH=26000*CURRENT_SAFETY_FACTOR, DUMMY=2000*CURRENT_SAFETY_FACTOR, G=2000*CURRENT_SAFETY_FACTOR) # The g-coil has a saturation voltage that is tunable on a shot-by-shot basis. # There is a deadband, where an action with absolute value of less than 8% of # the saturation voltage is treated as zero. ENV_G_COIL_SATURATION_VOLTAGE = 300 ENV_G_COIL_DEADBAND = ENV_G_COIL_SATURATION_VOLTAGE * 0.08 VOLTAGE_SAFETY_FACTOR = 1.0 ENV_COIL_MAX_VOLTAGE = collections.OrderedDict( E=1400*VOLTAGE_SAFETY_FACTOR, F=2200*VOLTAGE_SAFETY_FACTOR, OH=1400*VOLTAGE_SAFETY_FACTOR, DUMMY=400*VOLTAGE_SAFETY_FACTOR, # This value is also used to clip values for the internal controller, # and also to set the deadband voltage. G=ENV_G_COIL_SATURATION_VOLTAGE) # Ordered actions send by a controller to the TCV. TCV_ACTIONS = ( 'E_001', 'E_002', 'E_003', 'E_004', 'E_005', 'E_006', 'E_007', 'E_008', 'F_001', 'F_002', 'F_003', 'F_004', 'F_005', 'F_006', 'F_007', 'F_008', 'OH_001', 'OH_002', 'DUMMY_001', # GAS, ignored by TCV. 'G_001' # FAST ) TCV_ACTION_INDICES = {n: i for i, n in enumerate(TCV_ACTIONS)} TCV_ACTION_TYPES = collections.OrderedDict( E=8, F=8, OH=2, DUMMY=1, G=1, ) # Map the TCV actions to ranges of indices in the array. TCV_ACTION_RANGES = named_array.NamedRanges(TCV_ACTION_TYPES) # The voltages seem not to be centered at 0, but instead near these values: TCV_ACTION_OFFSETS = { 'E_001': 6.79, 'E_002': -10.40, 'E_003': -1.45, 'E_004': 0.18, 'E_005': 11.36, 'E_006': -0.95, 'E_007': -4.28, 'E_008': 44.22, 'F_001': 38.49, 'F_002': -2.94, 'F_003': 5.58, 'F_004': 1.09, 'F_005': -36.63, 'F_006': -9.18, 'F_007': 5.34, 'F_008': 10.53, 'OH_001': -53.63, 'OH_002': -14.76, } TCV_ACTION_DELAYS = { 'E': [0.0005] * 8, 'F': [0.0005] * 8, 'OH': [0.0005] * 2, 'G': [0.0001], } # Ordered measurements and their dimensions from to the TCV controller specs. TCV_MEASUREMENTS = collections.OrderedDict( clint_vloop=1, # Flux loop 1 clint_rvloop=37, # Difference of flux between loops 2-38 and flux loop 1 bm=38, # Magnetic field probes IE=8, # E-coil currents IF=8, # F-coil currents IOH=2, # OH-coil currents Bdot=20, # Selection of 20 time-derivatives of magnetic field probes (bm). DIOH=1, # OH-coil currents difference: OH(0) - OH(1). FIR_FRINGE=1, # Not used, ignore. IG=1, # G-coil current ONEMM=1, # Not used, ignore vloop=1, # Flux loop 1 derivative IPHI=1, # Current through the Toroidal Field coils. Constant. Ignore. ) NUM_MEASUREMENTS = sum(TCV_MEASUREMENTS.values()) # map the TCV measurements to ranges of indices in the array TCV_MEASUREMENT_RANGES = named_array.NamedRanges(TCV_MEASUREMENTS) # Several of the measurement probes for the rvloops are broken. Add an extra key # that allows us to only grab the usable ones BROKEN_RVLOOP_IDXS = [9, 10, 11] TCV_MEASUREMENT_RANGES.set_range('clint_rvloop_usable', [ idx for i, idx in enumerate(TCV_MEASUREMENT_RANGES['clint_rvloop']) if i not in BROKEN_RVLOOP_IDXS]) TCV_COIL_CURRENTS_INDEX = [ *TCV_MEASUREMENT_RANGES['IE'], *TCV_MEASUREMENT_RANGES['IF'], *TCV_MEASUREMENT_RANGES['IOH'], *TCV_MEASUREMENT_RANGES['IPHI'], # In place of DUMMY. *TCV_MEASUREMENT_RANGES['IG'], ] # References for what we want the agent to accomplish. REF_RANGES = named_array.NamedRanges({ 'R': 2, 'Z': 2, 'Ip': 2, 'kappa': 2, 'delta': 2, 'radius': 2, 'lambda': 2, 'diverted': 2, # bool, must be diverted 'limited': 2, # bool, must be limited 'shape_r': 32, 'shape_z': 32, 'x_points_r': 8, 'x_points_z': 8, 'legs_r': 16, # Use for diverted/snowflake 'legs_z': 16, 'limit_point_r': 2, 'limit_point_z': 2, }) # Environments should use a consistent datatype for interacting with agents. ENVIRONMENT_DATA_TYPE = np.float64 def observation_spec(): """Observation spec for all TCV environments.""" return { 'references': specs.Array( shape=(REF_RANGES.size,), dtype=ENVIRONMENT_DATA_TYPE, name='references'), 'measurements': specs.Array( shape=(TCV_MEASUREMENT_RANGES.size,), dtype=ENVIRONMENT_DATA_TYPE, name='measurements'), 'last_action': specs.Array( shape=(TCV_ACTION_RANGES.size,), dtype=ENVIRONMENT_DATA_TYPE, name='last_action'), } def measurements_to_dict(measurements): """Converts a single measurement vector or a time series to a dict. Args: measurements: A single measurement of size `NUM_MEASUREMENTS` or a time series, where the batch dimension is last, shape: (NUM_MEASUREMENTS, t). Returns: A dict mapping keys `TCV_MEASUREMENTS` to the corresponding measurements. """ assert measurements.shape[0] == NUM_MEASUREMENTS measurements_dict = collections.OrderedDict() index = 0 for key, dim in TCV_MEASUREMENTS.items(): measurements_dict[key] = measurements[index:index + dim, ...] index += dim return measurements_dict def dict_to_measurement(measurement_dict): """Converts a single measurement dict to a vector or time series. Args: measurement_dict: A dict with the measurement keys containing np arrays of size (meas_size, ...). The inner sizes all have to be the same. Returns: An array of size (num_measurements, ...) """ assert len(measurement_dict) == len(TCV_MEASUREMENTS) # Grab the shape of the first array. shape = measurement_dict['clint_vloop'].shape out_shape = list(shape) out_shape[0] = NUM_MEASUREMENTS out_shape = tuple(out_shape) measurements = np.zeros((out_shape)) index = 0 for key, dim in TCV_MEASUREMENTS.items(): dim = TCV_MEASUREMENTS[key] measurements[index:index + dim, ...] = measurement_dict[key] index += dim return measurements def action_spec(): return get_coil_spec(TCV_ACTIONS, ENV_COIL_MAX_VOLTAGE, ENVIRONMENT_DATA_TYPE) def get_coil_spec(coil_names: Sequence[Text], spec_mapping, dtype=ENVIRONMENT_DATA_TYPE) -> specs.BoundedArray: """Maps specs indexed by coil type to coils given their type.""" coil_max, coil_min = [], [] for name in coil_names: # Coils names are _ coil_type, _ = name.split('_') coil_max.append(spec_mapping[coil_type]) coil_min.append(-spec_mapping[coil_type]) return specs.BoundedArray( shape=(len(coil_names),), dtype=dtype, minimum=coil_min, maximum=coil_max) INNER_LIMITER_R = 0.62400001 OUTER_LIMITER_R = 1.14179182 LIMITER_WIDTH = OUTER_LIMITER_R - INNER_LIMITER_R LIMITER_RADIUS = LIMITER_WIDTH / 2 VESSEL_CENTER_R = INNER_LIMITER_R + LIMITER_RADIUS