PokéLLMon 源码解析(四)
.\PokeLLMon\poke_env\exceptions.py
"""
This module contains exceptions.
"""# 定义一个自定义异常类 ShowdownException,继承自内置异常类 Exception
class ShowdownException(Exception):"""This exception is raised when a non-managed messageis received from the server."""# 当从服务器接收到非受控消息时引发此异常pass
.\PokeLLMon\poke_env\player\baselines.py
# 导入必要的模块
from typing import List
import json
import osfrom poke_env.environment.abstract_battle import AbstractBattle
from poke_env.environment.double_battle import DoubleBattle
from poke_env.environment.move_category import MoveCategory
from poke_env.environment.pokemon import Pokemon
from poke_env.environment.side_condition import SideCondition
from poke_env.player.player import Player
from poke_env.data.gen_data import GenData# 从文件中加载招式效果数据
with open("./poke_env/data/static/moves/moves_effect.json", "r") as f:move_effect = json.load(f)# 计算招式类型的伤害倍率
def calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list):# 定义所有可能的宝可梦类型TYPE_list = 'BUG,DARK,DRAGON,ELECTRIC,FAIRY,FIGHTING,FIRE,FLYING,GHOST,GRASS,GROUND,ICE,NORMAL,POISON,PSYCHIC,ROCK,STEEL,WATER'.split(",")move_type_damage_multiplier_list = []# 如果存在第二个类型if type_2:# 计算每种类型对应的伤害倍率for type in TYPE_list:move_type_damage_multiplier_list.append(type_chart[type_1][type] * type_chart[type_2][type])move_type_damage_multiplier_dict = dict(zip(TYPE_list, move_type_damage_multiplier_list))else:move_type_damage_multiplier_dict = type_chart[type_1]effective_type_list = []extreme_type_list = []resistant_type_list = []extreme_resistant_type_list = []immune_type_list = []# 根据伤害倍率将类型分为不同的类别for type, value in move_type_damage_multiplier_dict.items():if value == 2:effective_type_list.append(type)elif value == 4:extreme_type_list.append(type)elif value == 1 / 2:resistant_type_list.append(type)elif value == 1 / 4:extreme_resistant_type_list.append(type)elif value == 0:immune_type_list.append(type)else: # value == 1continue# 如果约束类型列表不为空if constraint_type_list:# 更新极端类型列表,取交集extreme_type_list = list(set(extreme_type_list).intersection(set(constraint_type_list)))# 更新有效类型列表,取交集effective_type_list = list(set(effective_type_list).intersection(set(constraint_type_list)))# 更新抗性类型列表,取交集resistant_type_list = list(set(resistant_type_list).intersection(set(constraint_type_list)))# 更新极端抗性类型列表,取交集extreme_resistant_type_list = list(set(extreme_resistant_type_list).intersection(set(constraint_type_list)))# 更新免疫类型列表,取交集immune_type_list = list(set(immune_type_list).intersection(set(constraint_type_list)))# 返回更新后的各类型列表return extreme_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list
# 定义一个函数,根据给定的参数计算并返回对应的移动类型伤害提示
def move_type_damage_wraper(pokemon_name, type_1, type_2, type_chart, constraint_type_list=None):# 初始化移动类型伤害提示字符串move_type_damage_prompt = ""# 调用函数计算移动类型伤害倍数,得到各种类型的列表extreme_effective_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list = calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list)# 如果存在有效的、抵抗的或免疫的类型列表if effective_type_list or resistant_type_list or immune_type_list:# 构建移动类型伤害提示字符串move_type_damage_prompt = f"{pokemon_name}"if extreme_effective_type_list:move_type_damage_prompt = move_type_damage_prompt + " can be super-effectively attacked by " + ", ".join(extreme_effective_type_list) + " moves"if effective_type_list:move_type_damage_prompt = move_type_damage_prompt + ", can be effectively attacked by " + ", ".join(effective_type_list) + " moves"if resistant_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is resistant to " + ", ".join(resistant_type_list) + " moves"if extreme_resistant_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is super-resistant to " + ", ".join(extreme_resistant_type_list) + " moves"if immune_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is immuned to " + ", ".join(immune_type_list) + " moves"# 返回移动类型伤害提示字符串return move_type_damage_prompt# 定义一个类,继承自Player类,实现最大基础伤害玩家
class MaxBasePowerPlayer(Player):# 重写choose_move方法def choose_move(self, battle: AbstractBattle):# 如果存在可用的移动if battle.available_moves:# 选择基础伤害最大的移动best_move = max(battle.available_moves, key=lambda move: move.base_power)return self.create_order(best_move)# 如果没有可用的移动,则随机选择一个移动return self.choose_random_move(battle)# 定义一个类,继承自Player类,实现简单启发式玩家
class SimpleHeuristicsPlayer(Player):# 定义了各种入场危害效果,将字符串映射到对应的SideCondition枚举值ENTRY_HAZARDS = {"spikes": SideCondition.SPIKES,"stealhrock": SideCondition.STEALTH_ROCK,"stickyweb": SideCondition.STICKY_WEB,"toxicspikes": SideCondition.TOXIC_SPIKES,}# 定义了反危害招式,使用集合存储ANTI_HAZARDS_MOVES = {"rapidspin", "defog"}# 定义了速度等级系数SPEED_TIER_COEFICIENT = 0.1# 定义了生命值分数系数HP_FRACTION_COEFICIENT = 0.4# 定义了交换出场匹配阈值SWITCH_OUT_MATCHUP_THRESHOLD = -2# 估算对战情况,返回得分def _estimate_matchup(self, mon: Pokemon, opponent: Pokemon):# 计算对手对我方造成的伤害倍率的最大值score = max([opponent.damage_multiplier(t) for t in mon.types if t is not None])# 减去我方对对手造成的伤害倍率的最大值score -= max([mon.damage_multiplier(t) for t in opponent.types if t is not None])# 根据速度等级差异调整得分if mon.base_stats["spe"] > opponent.base_stats["spe"]:score += self.SPEED_TIER_COEFICIENTelif opponent.base_stats["spe"] > mon.base_stats["spe"]:score -= self.SPEED_TIER_COEFICIENT# 根据生命值分数调整得分score += mon.current_hp_fraction * self.HP_FRACTION_COEFICIENTscore -= opponent.current_hp_fraction * self.HP_FRACTION_COEFICIENTreturn score# 判断是否应该使用极巨化def _should_dynamax(self, battle: AbstractBattle, n_remaining_mons: int):if battle.can_dynamax and self._dynamax_disable is False:# 最后一个满血的精灵if (len([m for m in battle.team.values() if m.current_hp_fraction == 1])== 1and battle.active_pokemon.current_hp_fraction == 1):return True# 有优势且双方都是满血if (self._estimate_matchup(battle.active_pokemon, battle.opponent_active_pokemon)> 0and battle.active_pokemon.current_hp_fraction == 1and battle.opponent_active_pokemon.current_hp_fraction == 1):return True# 只剩下一个精灵if n_remaining_mons == 1:return Truereturn False# 判断是否应该替换出当前精灵def _should_switch_out(self, battle: AbstractBattle):# 获取当前精灵和对手精灵active = battle.active_pokemonopponent = battle.opponent_active_pokemon# 如果有一个适合替换的精灵...if [mfor m in battle.available_switchesif self._estimate_matchup(m, opponent) > 0]:# ...并且有一个“好”的理由替换出去if active.boosts["def"] <= -3 or active.boosts["spd"] <= -3:return Trueif (active.boosts["atk"] <= -3and active.stats["atk"] >= active.stats["spa"]):return Trueif (active.boosts["spa"] <= -3and active.stats["atk"] <= active.stats["spa"]):return Trueif (self._estimate_matchup(active, opponent)< self.SWITCH_OUT_MATCHUP_THRESHOLD):return Truereturn False# 估算精灵的状态def _stat_estimation(self, mon: Pokemon, stat: str):# 计算状态提升值if mon.boosts[stat] > 1:boost = (2 + mon.boosts[stat]) / 2else:boost = 2 / (2 - mon.boosts[stat])return ((2 * mon.base_stats[stat] + 31) + 5) * boost# 计算奖励值def calc_reward(self, current_battle: AbstractBattle) -> float:# 计算奖励值return self.reward_computing_helper(current_battle, fainted_value=2.0, hp_value=1.0, victory_value=30.0)# 根据状态和等级返回加成倍数def boost_multiplier(self, state, level):# 如果状态是准确度if state == "accuracy":# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.33if level == 2:return 1.66if level == 3:return 2.0if level == 4:return 2.5if level == 5:return 2.66if level == 6:return 3.0if level == -1:return 0.75if level == -2:return 0.6if level == -3:return 0.5if level == -4:return 0.43if level == -5:return 0.36if level == -6:return 0.33# 如果状态不是准确度else:# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.5if level == 2:return 2.0if level == 3:return 2.5if level == 4:return 3.0if level == 5:return 3.5if level == 6:return 4.0if level == -1:return 0.67if level == -2:return 0.5if level == -3:return 0.4if level == -4:return 0.33if level == -5:return 0.29if level == -6:return 0.25# 检查给定状态的值,并返回相应的状态字符串def check_status(self, status):# 如果状态存在if status:# 根据状态值返回相应的状态字符串if status.value == 1:return "burnt"elif status.value == 2:return "fainted"elif status.value == 3:return "frozen"elif status.value == 4:return "paralyzed"elif status.value == 5:return "poisoned"elif status.value == 7:return "toxic"elif status.value == 6:return "sleeping"# 如果状态不存在,则返回"healthy"else:return "healthy"
.\PokeLLMon\poke_env\player\battle_order.py
# 从 dataclasses 模块中导入 dataclass 装饰器
# 从 typing 模块中导入 Any, List, Optional, Union 类型
# 从 poke_env.environment.double_battle 模块中导入 DoubleBattle 类
# 从 poke_env.environment.move 模块中导入 Move 类
# 从 poke_env.environment.pokemon 模块中导入 Pokemon 类
from dataclasses import dataclass
from typing import Any, List, Optional, Union# 定义一个名为 BattleOrder 的数据类
@dataclass
class BattleOrder:# order 属性可以是 Move 或 Pokemon 类型,初始值为 Noneorder: Optional[Union[Move, Pokemon]]# mega, z_move, dynamax, terastallize, move_target 属性的默认值mega: bool = Falsez_move: bool = Falsedynamax: bool = Falseterastallize: bool = Falsemove_target: int = DoubleBattle.EMPTY_TARGET_POSITION# 默认的指令字符串DEFAULT_ORDER = "/choose default"# 返回对象的字符串表示形式def __str__(self) -> str:return self.message# 返回消息字符串@propertydef message(self) -> str:# 如果 order 是 Move 类型if isinstance(self.order, Move):# 如果 order 的 id 是 "recharge"if self.order.id == "recharge":return "/choose move 1"# 构建消息字符串message = f"/choose move {self.order.id}"if self.mega:message += " mega"elif self.z_move:message += " zmove"elif self.dynamax:message += " dynamax"elif self.terastallize:message += " terastallize"# 如果 move_target 不是空目标位置if self.move_target != DoubleBattle.EMPTY_TARGET_POSITION:message += f" {self.move_target}"return message# 如果 order 是 Pokemon 类型elif isinstance(self.order, Pokemon):return f"/choose switch {self.order.species}"else:return ""# 定义一个名为 DefaultBattleOrder 的类,继承自 BattleOrder 类
class DefaultBattleOrder(BattleOrder):# 初始化方法,不执行任何操作def __init__(self, *args: Any, **kwargs: Any):pass# 返回默认指令字符串@propertydef message(self) -> str:return self.DEFAULT_ORDER# 定义一个名为 DoubleBattleOrder 的数据类,继承自 BattleOrder 类
@dataclass
class DoubleBattleOrder(BattleOrder):# 初始化方法,接受两个可选的 BattleOrder 参数def __init__(self,first_order: Optional[BattleOrder] = None,second_order: Optional[BattleOrder] = None,):self.first_order = first_orderself.second_order = second_order# 返回消息字符串@property# 返回合并后的消息字符串def message(self) -> str:# 如果存在第一和第二指令,则返回两者消息的组合if self.first_order and self.second_order:return (self.first_order.message+ ", "+ self.second_order.message.replace("/choose ", ""))# 如果只存在第一指令,则返回第一指令消息和默认消息的组合elif self.first_order:return self.first_order.message + ", default"# 如果只存在第二指令,则返回第二指令消息和默认消息的组合elif self.second_order:return self.second_order.message + ", default"# 如果都不存在指令,则返回默认指令消息else:return self.DEFAULT_ORDER# 静态方法,用于合并第一和第二指令列表生成双重战斗指令列表@staticmethoddef join_orders(first_orders: List[BattleOrder], second_orders: List[BattleOrder]):# 如果第一和第二指令列表都存在if first_orders and second_orders:# 生成双重战斗指令列表,排除特定条件下的指令orders = [DoubleBattleOrder(first_order=first_order, second_order=second_order)for first_order in first_ordersfor second_order in second_ordersif not first_order.mega or not second_order.megaif not first_order.z_move or not second_order.z_moveif not first_order.dynamax or not second_order.dynamaxif not first_order.terastallize or not second_order.terastallizeif first_order.order != second_order.order]# 如果生成了双重战斗指令列表,则返回该列表if orders:return orders# 如果只存在第一指令列表,则生成只包含第一指令的双重战斗指令列表elif first_orders:return [DoubleBattleOrder(first_order=order) for order in first_orders]# 如果只存在第二指令列表,则生成只包含第二指令的双重战斗指令列表elif second_orders:return [DoubleBattleOrder(first_order=order) for order in second_orders]# 如果两个指令列表都不存在,则返回只包含默认指令的双重战斗指令列表return [DefaultBattleOrder()]
# 定义一个名为ForfeitBattleOrder的类,继承自BattleOrder类
class ForfeitBattleOrder(BattleOrder):# 初始化方法,接受任意数量的位置参数和关键字参数def __init__(self, *args: Any, **kwargs: Any):# pass表示不做任何操作,保持方法的结构完整pass# 定义一个名为message的属性,返回字符串"/forfeit"@propertydef message(self) -> str:return "/forfeit"
.\PokeLLMon\poke_env\player\gpt_player.py
import json # 导入 json 模块
import os # 导入 os 模块
import random # 导入 random 模块
from typing import List # 导入 List 类型提示
from poke_env.environment.abstract_battle import AbstractBattle # 导入 AbstractBattle 类
from poke_env.environment.double_battle import DoubleBattle # 导入 DoubleBattle 类
from poke_env.environment.move_category import MoveCategory # 导入 MoveCategory 类
from poke_env.environment.pokemon import Pokemon # 导入 Pokemon 类
from poke_env.environment.side_condition import SideCondition # 导入 SideCondition 类
from poke_env.player.player import Player, BattleOrder # 导入 Player 和 BattleOrder 类
from typing import Dict, List, Optional, Union # 导入 Dict, List, Optional, Union 类型提示
from poke_env.environment.move import Move # 导入 Move 类
import time # 导入 time 模块
import json # 再次导入 json 模块(重复导入)
from openai import OpenAI # 导入 OpenAI 类
from poke_env.data.gen_data import GenData # 导入 GenData 类def calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list):TYPE_list = 'BUG,DARK,DRAGON,ELECTRIC,FAIRY,FIGHTING,FIRE,FLYING,GHOST,GRASS,GROUND,ICE,NORMAL,POISON,PSYCHIC,ROCK,STEEL,WATER'.split(",")move_type_damage_multiplier_list = [] # 初始化一个空列表,用于存储每种类型的伤害倍率if type_2: # 如果存在第二种类型for type in TYPE_list: # 遍历每种类型move_type_damage_multiplier_list.append(type_chart[type_1][type] * type_chart[type_2][type]) # 计算两种类型之间的伤害倍率并添加到列表中move_type_damage_multiplier_dict = dict(zip(TYPE_list, move_type_damage_multiplier_list)) # 将类型和对应的伤害倍率组成字典else: # 如果只有一种类型move_type_damage_multiplier_dict = type_chart[type_1] # 直接使用第一种类型的伤害倍率字典effective_type_list = [] # 初始化有效类型列表extreme_type_list = [] # 初始化极效类型列表resistant_type_list = [] # 初始化抵抗类型列表extreme_resistant_type_list = [] # 初始化极度抵抗类型列表immune_type_list = [] # 初始化免疫类型列表for type, value in move_type_damage_multiplier_dict.items(): # 遍历每种类型及其对应的伤害倍率if value == 2: # 如果伤害倍率为 2effective_type_list.append(type) # 添加到有效类型列表elif value == 4: # 如果伤害倍率为 4extreme_type_list.append(type) # 添加到极效类型列表elif value == 1 / 2: # 如果伤害倍率为 1/2resistant_type_list.append(type) # 添加到抵抗类型列表elif value == 1 / 4: # 如果伤害倍率为 1/4extreme_resistant_type_list.append(type) # 添加到极度抵抗类型列表elif value == 0: # 如果伤害倍率为 0immune_type_list.append(type) # 添加到免疫类型列表else: # 如果伤害倍率为 1continue # 继续循环# 如果约束类型列表不为空if constraint_type_list:# 将极端类型列表与约束类型列表的交集作为新的极端类型列表extreme_type_list = list(set(extreme_type_list).intersection(set(constraint_type_list)))# 将有效类型列表与约束类型列表的交集作为新的有效类型列表effective_type_list = list(set(effective_type_list).intersection(set(constraint_type_list)))# 将抗性类型列表与约束类型列表的交集作为新的抗性类型列表resistant_type_list = list(set(resistant_type_list).intersection(set(constraint_type_list)))# 将极端抗性类型列表与约束类型列表的交集作为新的极端抗性类型列表extreme_resistant_type_list = list(set(extreme_resistant_type_list).intersection(set(constraint_type_list)))# 将免疫类型列表与约束类型列表的交集作为新的免疫类型列表immune_type_list = list(set(immune_type_list).intersection(set(constraint_type_list)))# 返回各类型列表的首字母大写形式return (list(map(lambda x: x.capitalize(), extreme_type_list)),list(map(lambda x: x.capitalize(), effective_type_list)),list(map(lambda x: x.capitalize(), resistant_type_list)),list(map(lambda x: x.capitalize(), extreme_resistant_type_list)),list(map(lambda x: x.capitalize(), immune_type_list)))
# 定义一个函数,用于计算给定精灵对应的移动类型伤害提示
def move_type_damage_wraper(pokemon, type_chart, constraint_type_list=None):# 初始化变量,用于存储精灵的两种类型type_1 = Nonetype_2 = None# 如果精灵有第一种类型if pokemon.type_1:# 获取第一种类型的名称type_1 = pokemon.type_1.name# 如果精灵有第二种类型if pokemon.type_2:# 获取第二种类型的名称type_2 = pokemon.type_2.name# 初始化移动类型伤害提示字符串move_type_damage_prompt = ""# 调用函数计算移动类型伤害倍数,得到不同类型的列表extreme_effective_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list = calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list)# 根据不同类型的列表生成移动类型伤害提示if extreme_effective_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(extreme_effective_type_list) +f"-type attack is extremely-effective (4x damage) to {pokemon.species}.")if effective_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(effective_type_list) +f"-type attack is super-effective (2x damage) to {pokemon.species}.")if resistant_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(resistant_type_list) +f"-type attack is ineffective (0.5x damage) to {pokemon.species}.")if extreme_resistant_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(extreme_resistant_type_list) +f"-type attack is highly ineffective (0.25x damage) to {pokemon.species}.")if immune_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(immune_type_list) +f"-type attack is zero effect (0x damage) to {pokemon.species}.")# 返回移动类型伤害提示字符串return move_type_damage_prompt# 定义一个类,继承自Player类
class LLMPlayer(Player):# 使用 OpenAI API 进行对话生成,返回生成的文本def chatgpt(self, system_prompt, user_prompt, model, temperature=0.7, json_format=False, seed=None, stop=[], max_tokens=200) -> str:# 创建 OpenAI 客户端对象client = OpenAI(api_key=self.api_key)# 如果需要返回 JSON 格式的响应if json_format:# 调用 API 完成对话生成,返回 JSON 格式的响应response = client.chat.completions.create(response_format={"type": "json_object"},model=model,messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=temperature,stream=False,# seed=seed,stop=stop,max_tokens=max_tokens)else:# 调用 API 完成对话生成response = client.chat.completions.create(model=model,messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=temperature,stream=False,# seed=seed,max_tokens=max_tokens,stop=stop)# 获取生成的文本内容outputs = response.choices[0].message.content# 记录完成的 token 数量self.completion_tokens += response.usage.completion_tokens# 记录 prompt 的 token 数量self.prompt_tokens += response.usage.prompt_tokens# 返回生成的文本return outputs# 估算两只精灵之间的对战得分def _estimate_matchup(self, mon: Pokemon, opponent: Pokemon):# 计算对手对该精灵造成的伤害加成中的最大值score = max([opponent.damage_multiplier(t) for t in mon.types if t is not None])# 计算该精灵对对手造成的伤害加成中的最大值score -= max([mon.damage_multiplier(t) for t in opponent.types if t is not None])# 根据速度判断得分if mon.base_stats["spe"] > opponent.base_stats["spe"]:score += self.SPEED_TIER_COEFICIENTelif opponent.base_stats["spe"] > mon.base_stats["spe"]:score -= self.SPEED_TIER_COEFICIENT# 根据当前生命值比例调整得分score += mon.current_hp_fraction * self.HP_FRACTION_COEFICIENTscore -= opponent.current_hp_fraction * self.HP_FRACTION_COEFICIENTreturn score# 判断是否应该使用极巨化def _should_dynamax(self, battle: AbstractBattle):# 统计队伍中剩余未倒下的精灵数量n_remaining_mons = len([m for m in battle.team.values() if m.fainted is False])if battle.can_dynamax and self._dynamax_disable is False:# 如果只剩下一只全血的精灵if (len([m for m in battle.team.values() if m.current_hp_fraction == 1])== 1and battle.active_pokemon.current_hp_fraction == 1):return True# 如果有对战优势且双方都是全血状态if (self._estimate_matchup(battle.active_pokemon, battle.opponent_active_pokemon)> 0and battle.active_pokemon.current_hp_fraction == 1and battle.opponent_active_pokemon.current_hp_fraction == 1):return True# 如果只剩下一只精灵if n_remaining_mons == 1:return Truereturn False# 解析LLM输出,找到JSON内容的起始位置json_start = llm_output.find('{')# 找到JSON内容的结束位置,从后往前找第一个}json_end = llm_output.rfind('}') + 1# 提取JSON内容json_content = llm_output[json_start:json_end]# 将JSON内容加载为Python对象llm_action_json = json.loads(json_content)# 初始化下一个动作为Nonenext_action = None# 如果JSON中包含"move"字段if "move" in llm_action_json.keys():# 获取LLM中的移动ID并处理格式llm_move_id = llm_action_json["move"]llm_move_id = llm_move_id.replace(" ","").replace("-", "")# 遍历可用的移动列表,匹配LLM中的移动IDfor i, move in enumerate(battle.available_moves):if move.id.lower() == llm_move_id.lower():# 创建相应的移动指令next_action = self.create_order(move, dynamax=self._should_dynamax(battle))# 如果JSON中包含"switch"字段elif "switch" in llm_action_json.keys():# 获取LLM中的交换精灵种类并匹配可用的交换精灵列表llm_switch_species = llm_action_json["switch"]for i, pokemon in enumerate(battle.available_switches):if pokemon.species.lower() == llm_switch_species.lower():# 创建相应的交换指令next_action = self.create_order(pokemon)# 如果下一个动作仍为None,则抛出数值错误异常if next_action is None:raise ValueError("Value Error")# 返回下一个动作return next_action# 解析LLM输出,找到JSON内容的起始位置json_start = llm_output.find('{')# 找到JSON内容的结束位置,从后往前找第一个}json_end = llm_output.rfind('}') + 1# 提取JSON内容json_content = llm_output[json_start:json_end]# 将JSON内容转换为Python对象llm_action_json = json.loads(json_content)next_action = None# 获取动作和目标action = llm_action_json["decision"]["action"]target = llm_action_json["decision"]["target"]# 处理目标字符串,去除空格和下划线target = target.replace(" ", "").replace("_", "")# 如果动作是移动if action.lower() == "move":# 遍历可用的移动for i, move in enumerate(battle.available_moves):# 如果移动ID匹配目标if move.id.lower() == target.lower():# 创建移动指令next_action = self.create_order(move, dynamax=self._should_dynamax(battle))# 如果动作是交换elif action.lower() == "switch":# 遍历可用的交换精灵for i, pokemon in enumerate(battle.available_switches):# 如果精灵种类匹配目标if pokemon.species.lower() == target.lower():# 创建交换指令next_action = self.create_order(pokemon)# 如果没有找到下一步动作,抛出数值错误if next_action is None:raise ValueError("Value Error")# 返回下一步动作return next_action# 检查状态并返回对应的字符串def check_status(self, status):if status:if status.value == 1:return "burnt"elif status.value == 2:return "fainted"elif status.value == 3:return "frozen"elif status.value == 4:return "paralyzed"elif status.value == 5:return "poisoned"elif status.value == 7:return "toxic"elif status.value == 6:return "sleeping"else:return ""# 根据状态和等级返回加成倍数def boost_multiplier(self, state, level):# 如果状态是准确度if state == "accuracy":# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.33if level == 2:return 1.66if level == 3:return 2.0if level == 4:return 2.5if level == 5:return 2.66if level == 6:return 3.0if level == -1:return 0.75if level == -2:return 0.6if level == -3:return 0.5if level == -4:return 0.43if level == -5:return 0.36if level == -6:return 0.33# 如果状态不是准确度else:# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.5if level == 2:return 2.0if level == 3:return 2.5if level == 4:return 3.0if level == 5:return 3.5if level == 6:return 4.0if level == -1:return 0.67if level == -2:return 0.5if level == -3:return 0.4if level == -4:return 0.33if level == -5:return 0.29if level == -6:return 0.25# 返回战斗摘要信息,包括击败得分、剩余得分、胜利列表和标签列表def battle_summary(self):# 初始化空列表用于存储击败得分、剩余得分、胜利列表和标签列表beat_list = []remain_list = []win_list = []tag_list = []# 遍历每场战斗,计算击败得分、剩余得分、是否胜利以及标签for tag, battle in self.battles.items():beat_score = 0# 计算对手队伍的击败得分for mon in battle.opponent_team.values():beat_score += (1-mon.current_hp_fraction)beat_list.append(beat_score)remain_score = 0# 计算己方队伍的剩余得分for mon in battle.team.values():remain_score += mon.current_hp_fractionremain_list.append(remain_score)# 如果战斗胜利,则在胜利列表中添加1if battle.won:win_list.append(1)tag_list.append(tag)# 返回击败得分列表、剩余得分列表、胜利列表和标签列表return beat_list, remain_list, win_list, tag_list# 辅助计算奖励值的函数def reward_computing_helper(self,battle: AbstractBattle,*,fainted_value: float = 0.0,hp_value: float = 0.0,number_of_pokemons: int = 6,starting_value: float = 0.0,status_value: float = 0.0,victory_value: float = 1.0,) -> float:"""A helper function to compute rewards."""# 如果战斗不在奖励缓冲区中,则将其添加,并设置初始值if battle not in self._reward_buffer:self._reward_buffer[battle] = starting_valuecurrent_value = 0# 遍历我方队伍中的每只精灵for mon in battle.team.values():# 根据当前生命值比例计算当前值current_value += mon.current_hp_fraction * hp_value# 如果精灵已经倒下,则减去倒下值if mon.fainted:current_value -= fainted_value# 如果精灵有异常状态,则减去异常状态值elif mon.status is not None:current_value -= status_value# 根据己方队伍中精灵数量与总精灵数量的差值计算当前值current_value += (number_of_pokemons - len(battle.team)) * hp_value# 遍历对方队伍中的每只精灵for mon in battle.opponent_team.values():# 根据当前生命值比例计算当前值current_value -= mon.current_hp_fraction * hp_value# 如果精灵已经倒下,则加上倒下值if mon.fainted:current_value += fainted_value# 如果精灵有异常状态,则加上异常状态值elif mon.status is not None:current_value += status_value# 根据对方队伍中精灵数量与总精灵数量的差值计算当前值current_value -= (number_of_pokemons - len(battle.opponent_team)) * hp_value# 如果战斗胜利,则加上胜利值if battle.won:current_value += victory_value# 如果战斗失败,则减去胜利值elif battle.lost:current_value -= victory_value# 计算当前值与奖励缓冲区中的值的差值作为返回值to_return = current_value - self._reward_buffer[battle] # the return value is the deltaself._reward_buffer[battle] = current_valuereturn to_returndef choose_max_damage_move(self, battle: AbstractBattle):# 如果有可用的招式,则选择基础威力最大的招式if battle.available_moves:best_move = max(battle.available_moves, key=lambda move: move.base_power)return self.create_order(best_move)# 如果没有可用的招式,则随机选择一个招式return self.choose_random_move(battle)
.\PokeLLMon\poke_env\player\llama_player.py
# 导入所需的模块
from poke_env.player.gpt_player import LLMPlayer
from poke_env.environment.abstract_battle import AbstractBattle
import json
from peft import PeftModel
import transformers
import torch
from poke_env.player.player import BattleOrder# 设置空字符串作为默认的令牌
my_token = ""
# 定义忽略索引
IGNORE_INDEX = -100
# 定义默认的填充令牌
DEFAULT_PAD_TOKEN = "[PAD]"
# 定义默认的结束令牌
DEFAULT_EOS_TOKEN = "</s>"
# 定义默认的开始令牌
DEFAULT_BOS_TOKEN = "<s>"
# 定义默认的未知令牌
DEFAULT_UNK_TOKEN = "<unk>"# 定义 LLAMAPlayer 类,继承自 LLMPlayer
class LLAMAPlayer(LLMPlayer):# 初始化函数,接受多个参数def __init__(self, battle_format,model_name_or_path: str = "",# tokenizer_path: str = "",lora_weights: str = "",model_max_length: int = 2048,w_reason = False,log_dir = "",account_configuration=None,server_configuration=None,):# 调用父类的初始化函数super().__init__(battle_format=battle_format,account_configuration=account_configuration,server_configuration=server_configuration)# 初始化 LLAMA 模型# 加载 LLAMA 模型self.except_cnt = 0self.total_cnt = 0self.log_dir = log_dirself.w_reason = w_reasonself.last_output = Noneself.last_state_prompt = None# 断言确保模型路径已指定assert (model_name_or_path), "Please specify the model path"# 使用指定的模型路径加载 tokenizerself.tokenizer = transformers.AutoTokenizer.from_pretrained(model_name_or_path,model_max_length=model_max_length,padding_side="right",use_fast=False,use_auth_token=my_token)# 使用指定的模型路径加载模型self.model = transformers.AutoModelForCausalLM.from_pretrained(model_name_or_path,load_in_8bit=False,torch_dtype=torch.float16,device_map="auto",use_auth_token=my_token)# 如果有 LoRA 权重,则加载if lora_weights:print("Recover LoRA weights..")self.model = PeftModel.from_pretrained(self.model,lora_weights,torch_dtype=torch.float16,)# 输出加载完成信息print("Loading finished...")# 设置模型为评估模式self.model.eval()
.\PokeLLMon\poke_env\player\openai_api.py
"""This module defines a player class with the OpenAI API on the main thread.
For a black-box implementation consider using the module env_player.
"""
# 导入必要的模块
from __future__ import annotationsimport asyncio
import copy
import random
import time
from abc import ABC, abstractmethod
from logging import Logger
from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, Tuple, Union# 导入自定义模块
from gymnasium.core import ActType, Env, ObsType
from gymnasium.spaces import Discrete, Space# 导入自定义模块
from poke_env.concurrency import POKE_LOOP, create_in_poke_loop
from poke_env.environment.abstract_battle import AbstractBattle
from poke_env.player.battle_order import BattleOrder, ForfeitBattleOrder
from poke_env.player.player import Player
from poke_env.ps_client import AccountConfiguration
from poke_env.ps_client.server_configuration import (LocalhostServerConfiguration,ServerConfiguration,
)
from poke_env.teambuilder.teambuilder import Teambuilder# 定义一个异步队列类
class _AsyncQueue:def __init__(self, queue: asyncio.Queue[Any]):self.queue = queue# 异步获取队列中的元素async def async_get(self):return await self.queue.get()# 获取队列中的元素def get(self):res = asyncio.run_coroutine_threadsafe(self.queue.get(), POKE_LOOP)return res.result()# 异步向队列中放入元素async def async_put(self, item: Any):await self.queue.put(item)# 向队列中放入元素def put(self, item: Any):task = asyncio.run_coroutine_threadsafe(self.queue.put(item), POKE_LOOP)task.result()# 判断队列是否为空def empty(self):return self.queue.empty()# 阻塞直到队列中的所有元素都被处理def join(self):task = asyncio.run_coroutine_threadsafe(self.queue.join(), POKE_LOOP)task.result()# 异步等待队列中的所有元素都被处理async def async_join(self):await self.queue.join()# 定义一个异步玩家类
class _AsyncPlayer(Generic[ObsType, ActType], Player):actions: _AsyncQueueobservations: _AsyncQueuedef __init__(self,user_funcs: OpenAIGymEnv[ObsType, ActType],username: str,**kwargs: Any,# 定义一个类,继承自AsyncPlayer类):# 设置类名为usernameself.__class__.__name__ = username# 调用父类的初始化方法super().__init__(**kwargs)# 设置类名为"_AsyncPlayer"self.__class__.__name__ = "_AsyncPlayer"# 初始化observations为一个异步队列self.observations = _AsyncQueue(create_in_poke_loop(asyncio.Queue, 1))# 初始化actions为一个异步队列self.actions = _AsyncQueue(create_in_poke_loop(asyncio.Queue, 1))# 初始化current_battle为Noneself.current_battle: Optional[AbstractBattle] = None# 初始化_user_funcs为user_funcs# 定义一个方法,用于选择移动def choose_move(self, battle: AbstractBattle) -> Awaitable[BattleOrder]:# 返回_env_move方法的结果return self._env_move(battle)# 定义一个异步方法,用于处理环境移动async def _env_move(self, battle: AbstractBattle) -> BattleOrder:# 如果当前战斗为空或已结束,则将当前战斗设置为传入的战斗if not self.current_battle or self.current_battle.finished:self.current_battle = battle# 如果当前战斗不等于传入的战斗,则抛出异常if not self.current_battle == battle:raise RuntimeError("Using different battles for queues")# 将战斗嵌入到用户函数中,并异步放入observations队列中battle_to_send = self._user_funcs.embed_battle(battle)await self.observations.async_put(battle_to_send)# 从actions队列中异步获取动作action = await self.actions.async_get()# 如果动作为-1,则返回放弃战斗的指令if action == -1:return ForfeitBattleOrder()# 将动作转换为移动指令并返回return self._user_funcs.action_to_move(action, battle)# 定义一个回调方法,用于处理战斗结束时的操作def _battle_finished_callback(self, battle: AbstractBattle):# 将战斗嵌入到用户函数中,并异步放入observations队列中to_put = self._user_funcs.embed_battle(battle)# 在POKE_LOOP中安全地运行异步放入操作asyncio.run_coroutine_threadsafe(self.observations.async_put(to_put), POKE_LOOP)
# 定义一个元类,继承自 ABC 类型
class _ABCMetaclass(type(ABC)):pass# 定义一个元类,继承自 Env 类型
class _EnvMetaclass(type(Env)):pass# 定义一个元类,继承自 _EnvMetaclass 和 _ABCMetaclass
class _OpenAIGymEnvMetaclass(_EnvMetaclass, _ABCMetaclass):pass# 定义一个类 OpenAIGymEnv,继承自 Env[ObsType, ActType] 和 ABC 类型,使用 _OpenAIGymEnvMetaclass 元类
class OpenAIGymEnv(Env[ObsType, ActType],ABC,metaclass=_OpenAIGymEnvMetaclass,
):"""Base class implementing the OpenAI Gym API on the main thread."""# 初始化重试次数_INIT_RETRIES = 100# 重试之间的时间间隔_TIME_BETWEEN_RETRIES = 0.5# 切换挑战任务的重试次数_SWITCH_CHALLENGE_TASK_RETRIES = 30# 切换重试之间的时间间隔_TIME_BETWEEN_SWITCH_RETIRES = 1# 初始化方法def __init__(self,account_configuration: Optional[AccountConfiguration] = None,*,avatar: Optional[int] = None,battle_format: str = "gen8randombattle",log_level: Optional[int] = None,save_replays: Union[bool, str] = False,server_configuration: Optional[ServerConfiguration] = LocalhostServerConfiguration,start_timer_on_battle_start: bool = False,start_listening: bool = True,ping_interval: Optional[float] = 20.0,ping_timeout: Optional[float] = 20.0,team: Optional[Union[str, Teambuilder]] = None,start_challenging: bool = False,# 抽象方法,计算奖励@abstractmethoddef calc_reward(self, last_battle: AbstractBattle, current_battle: AbstractBattle) -> float:"""Returns the reward for the current battle state. The battle state in the previousturn is given as well and can be used for comparisons.:param last_battle: The battle state in the previous turn.:type last_battle: AbstractBattle:param current_battle: The current battle state.:type current_battle: AbstractBattle:return: The reward for current_battle.:rtype: float"""pass# 抽象方法@abstractmethod# 根据给定的动作和当前战斗状态返回相应的战斗指令def action_to_move(self, action: int, battle: AbstractBattle) -> BattleOrder:"""Returns the BattleOrder relative to the given action.:param action: The action to take.:type action: int:param battle: The current battle state:type battle: AbstractBattle:return: The battle order for the given action in context of the current battle.:rtype: BattleOrder"""pass# 返回当前战斗状态的嵌入,格式与OpenAI gym API兼容@abstractmethoddef embed_battle(self, battle: AbstractBattle) -> ObsType:"""Returns the embedding of the current battle state in a format compatible withthe OpenAI gym API.:param battle: The current battle state.:type battle: AbstractBattle:return: The embedding of the current battle state."""pass# 返回嵌入的描述,必须返回一个指定了下限和上限的Space@abstractmethoddef describe_embedding(self) -> Space[ObsType]:"""Returns the description of the embedding. It must return a Space specifyinglow bounds and high bounds.:return: The description of the embedding.:rtype: Space"""pass# 返回动作空间的大小,如果大小为x,则动作空间从0到x-1@abstractmethoddef action_space_size(self) -> int:"""Returns the size of the action space. Given size x, the action space goesfrom 0 to x - 1.:return: The action space size.:rtype: int"""pass# 返回将在挑战循环的下一次迭代中挑战的对手(或对手列表)# 如果返回一个列表,则在挑战循环期间将随机选择一个元素@abstractmethoddef get_opponent(self,) -> Union[Player, str, List[Player], List[str]]:"""Returns the opponent (or list of opponents) that will be challengedon the next iteration of the challenge loop. If a list is returned,a random element will be chosen at random during the challenge loop.:return: The opponent (or list of opponents).:rtype: Player or str or list(Player) or list(str)"""pass# 获取对手玩家或字符串def _get_opponent(self) -> Union[Player, str]:# 获取对手opponent = self.get_opponent()# 如果对手是列表,则随机选择一个对手,否则直接返回对手random_opponent = (random.choice(opponent) if isinstance(opponent, list) else opponent)return random_opponent# 重置环境def reset(self,*,seed: Optional[int] = None,options: Optional[Dict[str, Any]] = None,) -> Tuple[ObsType, Dict[str, Any]]:# 如果有种子值,则使用种子值重置环境if seed is not None:super().reset(seed=seed) # type: ignoreself._seed_initialized = True# 如果种子值未初始化,则使用当前时间戳作为种子值elif not self._seed_initialized:super().reset(seed=int(time.time())) # type: ignoreself._seed_initialized = True# 如果当前没有对战,则等待对战开始if not self.agent.current_battle:count = self._INIT_RETRIESwhile not self.agent.current_battle:if count == 0:raise RuntimeError("Agent is not challenging")count -= 1time.sleep(self._TIME_BETWEEN_RETRIES)# 如果当前对战未结束,则等待对战结束if self.current_battle and not self.current_battle.finished:if self.current_battle == self.agent.current_battle:self._actions.put(-1)self._observations.get()else:raise RuntimeError("Environment and agent aren't synchronized. Try to restart")# 等待当前对战与对手对战不同while self.current_battle == self.agent.current_battle:time.sleep(0.01)# 更新当前对战为对手对战self.current_battle = self.agent.current_battlebattle = copy.copy(self.current_battle)battle.logger = Noneself.last_battle = copy.deepcopy(battle)return self._observations.get(), self.get_additional_info()# 获取额外信息def get_additional_info(self) -> Dict[str, Any]:"""Returns additional info for the reset method.Override only if you really need it.:return: Additional information as a Dict:rtype: Dict"""return {}def step(self, action: ActType) -> Tuple[ObsType, float, bool, bool, Dict[str, Any]]:"""Execute the specified action in the environment.:param ActType action: The action to be executed.:return: A tuple containing the new observation, reward, termination flag, truncation flag, and info dictionary.:rtype: Tuple[ObsType, float, bool, bool, Dict[str, Any]]"""# 如果当前战斗为空,则重置环境并返回初始观察和信息if not self.current_battle:obs, info = self.reset()return obs, 0.0, False, False, info# 如果当前战斗已经结束,则抛出异常if self.current_battle.finished:raise RuntimeError("Battle is already finished, call reset")# 复制当前战斗对象,以便进行操作battle = copy.copy(self.current_battle)battle.logger = None# 深度复制当前战斗对象,用于记录上一次的战斗状态self.last_battle = copy.deepcopy(battle)# 将动作放入动作队列self._actions.put(action)# 从观察队列中获取观察结果observation = self._observations.get()# 计算奖励reward = self.calc_reward(self.last_battle, self.current_battle)terminated = Falsetruncated = False# 如果当前战斗已经结束if self.current_battle.finished:size = self.current_battle.team_size# 计算剩余队伍中未被击倒的精灵数量remaining_mons = size - len([mon for mon in self.current_battle.team.values() if mon.fainted])remaining_opponent_mons = size - len([monfor mon in self.current_battle.opponent_team.values()if mon.fainted])# 如果一方队伍的精灵全部被击倒,则游戏结束if (remaining_mons == 0) != (remaining_opponent_mons == 0):terminated = Trueelse:truncated = True# 返回观察结果、奖励、游戏是否结束、游戏是否截断以及额外信息return observation, reward, terminated, truncated, self.get_additional_info()# 渲染当前战斗状态,显示当前回合信息和双方精灵状态def render(self, mode: str = "human"):# 如果当前存在战斗if self.current_battle is not None:# 打印当前回合信息和双方精灵状态print(" Turn %4d. | [%s][%3d/%3dhp] %10.10s - %10.10s [%3d%%hp][%s]"% (self.current_battle.turn,"".join(["⦻" if mon.fainted else "●"for mon in self.current_battle.team.values()]),self.current_battle.active_pokemon.current_hp or 0,self.current_battle.active_pokemon.max_hp or 0,self.current_battle.active_pokemon.species,self.current_battle.opponent_active_pokemon.species,self.current_battle.opponent_active_pokemon.current_hp or 0,"".join(["⦻" if mon.fainted else "●"for mon in self.current_battle.opponent_team.values()]),),end="\n" if self.current_battle.finished else "\r",)# 关闭当前战斗,清理资源def close(self, purge: bool = True):# 如果当前没有战斗或者当前战斗已结束if self.current_battle is None or self.current_battle.finished:# 等待1秒time.sleep(1)# 如果当前战斗不是代理的当前战斗if self.current_battle != self.agent.current_battle:self.current_battle = self.agent.current_battle# 创建一个异步任务来停止挑战循环closing_task = asyncio.run_coroutine_threadsafe(self._stop_challenge_loop(purge=purge), POKE_LOOP)# 获取异步任务的结果closing_task.result()def background_send_challenge(self, username: str):"""Sends a single challenge to a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task and not self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行发送挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.send_challenges(username, 1), POKE_LOOP)def background_accept_challenge(self, username: str):"""Accepts a single challenge from a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task and not self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行接受挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.accept_challenges(username, 1, self.agent.next_team), POKE_LOOP)async def _challenge_loop(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,# 如果没有指定挑战次数,则持续挑战直到 self._keep_challenging 为 False):# 如果没有挑战次数且 self._keep_challenging 为 Trueif not n_challenges:# 持续挑战直到 self._keep_challenging 为 Falsewhile self._keep_challenging:# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型if isinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent, 1)else:# 发送挑战请求await self.agent.send_challenges(opponent, 1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle is not None:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果指定了挑战次数且挑战次数大于 0elif n_challenges > 0:# 循环指定次数for _ in range(n_challenges):# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型if isinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent, 1)else:# 发送挑战请求await self.agent.send_challenges(opponent, 1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle is not None:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果挑战次数小于等于 0else:# 抛出数值错误异常raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")# 开始挑战def start_challenging(# 指定挑战次数,默认为 Noneself,n_challenges: Optional[int] = None,# 回调函数,接受 AbstractBattle 类型参数并返回 Nonecallback: Optional[Callable[[AbstractBattle], None]] = None,):"""Starts the challenge loop.:param n_challenges: The number of challenges to send. If empty it will run untilstopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with a copy ofthe final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task and not self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhile not self._challenge_task.done():if count == 0:raise RuntimeError("Agent is already challenging")count -= 1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战if not n_challenges:self._keep_challenging = True# 启动挑战循环任务self._challenge_task = asyncio.run_coroutine_threadsafe(self._challenge_loop(n_challenges, callback), POKE_LOOP)async def _ladder_loop(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,):# 如果指定了挑战次数,则进行相应次数的挑战if n_challenges:if n_challenges <= 0:raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")for _ in range(n_challenges):await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle is not None:callback(copy.deepcopy(self.current_battle))# 如果未指定挑战次数,则持续挑战直到停止else:while self._keep_challenging:await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle is not None:callback(copy.deepcopy(self.current_battle))# 启动 ladder 循环挑战def start_laddering(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,):"""Starts the laddering loop.:param n_challenges: The number of ladder games to play. If empty itwill run until stopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with acopy of the final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task and not self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhile not self._challenge_task.done():if count == 0:raise RuntimeError("Agent is already challenging")count -= 1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战if not n_challenges:self._keep_challenging = True# 使用 asyncio 在另一个线程中运行 _ladder_loop 方法,传入挑战次数和回调函数self._challenge_task = asyncio.run_coroutine_threadsafe(self._ladder_loop(n_challenges, callback), POKE_LOOP)async def _stop_challenge_loop(self, force: bool = True, wait: bool = True, purge: bool = False): # 定义一个方法,接受多个参数self._keep_challenging = False # 将属性_keep_challenging设置为Falseif force: # 如果force为真if self.current_battle and not self.current_battle.finished: # 如果存在当前战斗且未结束if not self._actions.empty(): # 如果_actions队列不为空await asyncio.sleep(2) # 异步等待2秒if not self._actions.empty(): # 如果_actions队列仍不为空raise RuntimeError( # 抛出运行时错误"The agent is still sending actions. ""Use this method only when training or ""evaluation are over.")if not self._observations.empty(): # 如果_observations队列不为空await self._observations.async_get() # 异步获取_observations队列中的数据await self._actions.async_put(-1) # 异步将-1放入_actions队列中if wait and self._challenge_task: # 如果wait为真且_challenge_task存在while not self._challenge_task.done(): # 当_challenge_task未完成时await asyncio.sleep(1) # 异步等待1秒self._challenge_task.result() # 获取_challenge_task的结果self._challenge_task = None # 将_challenge_task设置为Noneself.current_battle = None # 将current_battle设置为Noneself.agent.current_battle = None # 将agent的current_battle设置为Nonewhile not self._actions.empty(): # 当_actions队列不为空时await self._actions.async_get() # 异步获取_actions队列中的数据while not self._observations.empty(): # 当_observations队列不为空时await self._observations.async_get() # 异步获取_observations队列中的数据if purge: # 如果purge为真self.agent.reset_battles() # 调用agent的reset_battles方法def reset_battles(self): # 定义一个方法reset_battles"""Resets the player's inner battle tracker.""" # 重置玩家的内部战斗追踪器self.agent.reset_battles() # 调用agent的reset_battles方法# 检查任务是否完成,可设置超时时间def done(self, timeout: Optional[int] = None) -> bool:"""Returns True if the task is done or is done after the timeout, false otherwise.:param timeout: The amount of time to wait for if the task is not already done.If empty it will wait until the task is done.:type timeout: int, optional:return: True if the task is done or if the task gets completed after thetimeout.:rtype: bool"""# 如果挑战任务为空,则返回Trueif self._challenge_task is None:return True# 如果超时时间为空,则等待任务完成if timeout is None:self._challenge_task.result()return True# 如果挑战任务已完成,则返回Trueif self._challenge_task.done():return True# 等待一段时间后再次检查任务是否完成time.sleep(timeout)return self._challenge_task.done()# 暴露Player类的属性@propertydef battles(self) -> Dict[str, AbstractBattle]:return self.agent.battles@propertydef format(self) -> str:return self.agent.format@propertydef format_is_doubles(self) -> bool:return self.agent.format_is_doubles@propertydef n_finished_battles(self) -> int:return self.agent.n_finished_battles@propertydef n_lost_battles(self) -> int:return self.agent.n_lost_battles@propertydef n_tied_battles(self) -> int:return self.agent.n_tied_battles@propertydef n_won_battles(self) -> int:return self.agent.n_won_battles@propertydef win_rate(self) -> float:return self.agent.win_rate# 暴露Player Network Interface Class的属性@propertydef logged_in(self) -> asyncio.Event:"""Event object associated with user login.:return: The logged-in event:rtype: Event"""return self.agent.ps_client.logged_in@property# 返回与玩家相关联的日志记录器def logger(self) -> Logger:"""Logger associated with the player.:return: The logger.:rtype: Logger"""return self.agent.logger# 返回玩家的用户名@propertydef username(self) -> str:"""The player's username.:return: The player's username.:rtype: str"""return self.agent.username# 返回 WebSocket 的 URL@propertydef websocket_url(self) -> str:"""The websocket url.It is derived from the server url.:return: The websocket url.:rtype: str"""return self.agent.ps_client.websocket_url# 获取属性的值def __getattr__(self, item: str):return getattr(self.agent, item)
相关文章:
PokéLLMon 源码解析(四)
.\PokeLLMon\poke_env\exceptions.py """ This module contains exceptions. """# 定义一个自定义异常类 ShowdownException,继承自内置异常类 Exception class ShowdownException(Exception):"""This exception is …...
区块链基础知识01
区块链:区块链技术是一种高级数据库机制,允许在企业网络中透明地共享信息。区块链数据库将数据存储在区块中,而数据库则一起链接到一个链条中。数据在时间上是一致的,在没有网络共识的情况下,不能删除或修改链条。 即&…...

YOLOv9(2):YOLOv9网络结构
1. 前言 本文仅以官方提供的yolov9.yaml来进行简要讲解。 讲解之前,还是要做一些简单的铺垫。 Slice层不做任何的操作,纯粹是做一个占位层。这样一来,在parse_model时,ch[n]可表示第n层的输出通道。 Detect和DDetect主要区别还…...

提取b站字幕(视频字幕、AI字幕)
提取b站字幕(视频字幕、AI字幕) 1. 打开视频 2. 按 F12 进行开发者界面 视频自己的紫米输入的是 json,如果是AI字幕则需要输入 ai_subtitle 3. 进入这个网址:https://www.dreamlyn.cn/bsrt...

JAVA程序员如何快速熟悉新项目?
文章目录 Java程序员快速熟悉一个新项目的步骤通常包括以下几个方面:实例展示:Java程序员加入新项目时可能遇到的技术难题及其解决方案包括: Java程序员快速熟悉一个新项目的步骤通常包括以下几个方面: 理解项目背景和目标&#x…...

慢sql优化记录1
慢sql为: select count(*) from t_wf_process p left join t_wf_core_dofile dofile on p.wf_instance_uid dofile.instanceid join zwkj_department d on p.userdeptid d.department_guid ,t_wf_core_item i,wf_node n where (p.IS_DUPLICATE ! true or p.IS_DU…...

堆和堆排序
堆排序是一种与插入排序和并归排序十分不同的算法。 优先级队列 Priority Queue 优先级队列是类似于常规队列或堆栈数据结构的抽象数据类型(ADT)。优先级队列中的每个元素都有一个相关联的优先级key。在优先级队列中,高优先级的元素优先于…...

STM32 | 零基础 STM32 第一天
零基础 STM32 第一天 一、认知STM32 1、STM32概念 STM32:意法半导体基于ARM公司的Cortex-M内核开发的32位的高性能、低功耗单片机。 ST:意法半导体 M:基于ARM公司的Cortex-M内核的高性能、低功耗单片机 32:32位单片机 2、STM32开发的产品 STM32开发的产品&a…...

day16_购物车(添加购物车,购物车列表查询,删除购物车商品,更新选中商品状态,完成购物车商品的全选,清空购物车)
文章目录 购物车模块1 需求说明2 环境搭建3 添加购物车3.1 需求说明3.2 远程调用接口开发3.2.1 ProductController3.2.2 ProductService 3.3 openFeign接口定义3.3.1 环境搭建3.3.2 接口定义3.3.3 降级类定义 3.4 业务后端接口开发3.4.1 添加依赖3.4.2 修改启动类3.4.3 CartInf…...

基于Spring Boot的图书个性化推荐系统 ,计算机毕业设计(带源码+论文)
源码获取地址: 码呢-一个专注于技术分享的博客平台一个专注于技术分享的博客平台,大家以共同学习,乐于分享,拥抱开源的价值观进行学习交流http://www.xmbiao.cn/resource-details/1765769136268455938...

libevent源码解析:定时器事件(三)
文章目录 前言一、用例小根堆管理定时器事件小根堆和链表管理定时器事件区别 二、基本数据结构介绍结构体成员分析小根堆和链表common_timeout图示 三、源码分析小根堆管理定时器事件event_newevent_addevent_dispatch 链表common_timeout管理定时器事件event_base_init_common…...

3D资产管理
3D 资产管理是指组织、跟踪、优化和分发 3D 模型和资产以用于游戏、电影、AR/VR 体验等各种应用的过程。 3D资产管理也称为3D内容管理。 随着游戏、电影、建筑、工程等行业中 3D 内容的增长,实施有效的资产管理工作流程对于提高生产力、减少错误、简化工作流程以及使…...

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Blank)
空白填充组件,在容器主轴方向上,空白填充组件具有自动填充容器空余部分的能力。仅当父组件为Row/Column/Flex时生效。 说明: 该组件从API Version 7开始支持。后续版本如有新增内容,则采用上角标单独标记该内容的起始版本。 子组件…...

【手游联运平台搭建】游戏平台的作用
随着科技的不断发展,游戏行业也在不断壮大,而游戏平台作为连接玩家与游戏的桥梁,发挥着越来越重要的作用。游戏平台不仅为玩家提供了便捷的游戏体验,还为游戏开发者提供了广阔的市场和推广渠道。本文将从多个方面探讨游戏平台的作…...
手把手教会你 - StreamAPI基本用法
1. 简介 目前响应式编程的学习中很多时候都用到了Lambda表达式和StreamAPI,那么今天就在这里记录一下一些最基本的使用方法。 StreamAPI中引入了流的概念,其将集合看作一种流,流在管道中传输(动态的),可以…...

和为K的子数组
题目: 使用前缀和的方法可以解决这个问题,因为我们需要找到和为k的连续子数组的个数。通过计算前缀和,我们可以将问题转化为求解两个前缀和之差等于k的情况。 假设数组的前缀和数组为prefixSum,其中prefixSum[i]表示从数组起始位…...

Redis:java中redis的基本使用(springboot)
文章目录 springboot中使用redisspringboot 连接 redis三种方式导入依赖增删改查小练习 springboot中使用redis springboot 连接 redis三种方式 jedis (redis官方提供的)springboot自带的redisson (基于jedis优化的,性能最好,使…...
微型计算机技术
摘要:微型计算机是通用计算机的一个重要发展分支,自1981年美国IBM公司推出第一代商用微型计算机以来,微型计算机迅速进入社会各个领域,且技术不断更新、产品快速换代,已成为人们工作和生活中不可缺少的基本工具。 一、微型计算机技术发展历史 1.第一代微处理器(19…...

mysql下载教程
什么是mysql MySQL是一种开源的关系型数据库管理系统,由瑞典MySQL AB公司开发,现在由Oracle公司维护。MySQL支持多个操作系统,包括Linux、Windows、macOS等。它是一种客户端/服务器模式的数据库,提供高效、可靠、稳定的数据存储和…...

ResponseStatusException
目录 概述: 综合实例: 继承 ResponseStatusException-自定义异常类 继承 ResponseStatusException-自定义响应头信息 继承 ResponseStatusException-定制更多异常处理逻辑 继承 ResponseStatusException-根据异常发生的上下文动态改变 HTTP 状态码…...

多云管理“拦路虎”:深入解析网络互联、身份同步与成本可视化的技术复杂度
一、引言:多云环境的技术复杂性本质 企业采用多云策略已从技术选型升维至生存刚需。当业务系统分散部署在多个云平台时,基础设施的技术债呈现指数级积累。网络连接、身份认证、成本管理这三大核心挑战相互嵌套:跨云网络构建数据…...

使用VSCode开发Django指南
使用VSCode开发Django指南 一、概述 Django 是一个高级 Python 框架,专为快速、安全和可扩展的 Web 开发而设计。Django 包含对 URL 路由、页面模板和数据处理的丰富支持。 本文将创建一个简单的 Django 应用,其中包含三个使用通用基本模板的页面。在此…...

【OSG学习笔记】Day 18: 碰撞检测与物理交互
物理引擎(Physics Engine) 物理引擎 是一种通过计算机模拟物理规律(如力学、碰撞、重力、流体动力学等)的软件工具或库。 它的核心目标是在虚拟环境中逼真地模拟物体的运动和交互,广泛应用于 游戏开发、动画制作、虚…...

K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...

以下是对华为 HarmonyOS NETX 5属性动画(ArkTS)文档的结构化整理,通过层级标题、表格和代码块提升可读性:
一、属性动画概述NETX 作用:实现组件通用属性的渐变过渡效果,提升用户体验。支持属性:width、height、backgroundColor、opacity、scale、rotate、translate等。注意事项: 布局类属性(如宽高)变化时&#…...
sqlserver 根据指定字符 解析拼接字符串
DECLARE LotNo NVARCHAR(50)A,B,C DECLARE xml XML ( SELECT <x> REPLACE(LotNo, ,, </x><x>) </x> ) DECLARE ErrorCode NVARCHAR(50) -- 提取 XML 中的值 SELECT value x.value(., VARCHAR(MAX))…...
【碎碎念】宝可梦 Mesh GO : 基于MESH网络的口袋妖怪 宝可梦GO游戏自组网系统
目录 游戏说明《宝可梦 Mesh GO》 —— 局域宝可梦探索Pokmon GO 类游戏核心理念应用场景Mesh 特性 宝可梦玩法融合设计游戏构想要素1. 地图探索(基于物理空间 广播范围)2. 野生宝可梦生成与广播3. 对战系统4. 道具与通信5. 延伸玩法 安全性设计 技术选…...
MySQL账号权限管理指南:安全创建账户与精细授权技巧
在MySQL数据库管理中,合理创建用户账号并分配精确权限是保障数据安全的核心环节。直接使用root账号进行所有操作不仅危险且难以审计操作行为。今天我们来全面解析MySQL账号创建与权限分配的专业方法。 一、为何需要创建独立账号? 最小权限原则…...
JS设计模式(4):观察者模式
JS设计模式(4):观察者模式 一、引入 在开发中,我们经常会遇到这样的场景:一个对象的状态变化需要自动通知其他对象,比如: 电商平台中,商品库存变化时需要通知所有订阅该商品的用户;新闻网站中࿰…...