当前位置: 首页 > news >正文

PokéLLMon 源码解析(四)

.\PokeLLMon\poke_env\exceptions.py

"""
This module contains exceptions.
"""# 定义一个自定义异常类 ShowdownException,继承自内置异常类 Exception
class ShowdownException(Exception):"""This exception is raised when a non-managed messageis received from the server."""# 当从服务器接收到非受控消息时引发此异常pass

.\PokeLLMon\poke_env\player\baselines.py

# 导入必要的模块
from typing import List
import json
import osfrom poke_env.environment.abstract_battle import AbstractBattle
from poke_env.environment.double_battle import DoubleBattle
from poke_env.environment.move_category import MoveCategory
from poke_env.environment.pokemon import Pokemon
from poke_env.environment.side_condition import SideCondition
from poke_env.player.player import Player
from poke_env.data.gen_data import GenData# 从文件中加载招式效果数据
with open("./poke_env/data/static/moves/moves_effect.json", "r") as f:move_effect = json.load(f)# 计算招式类型的伤害倍率
def calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list):# 定义所有可能的宝可梦类型TYPE_list = 'BUG,DARK,DRAGON,ELECTRIC,FAIRY,FIGHTING,FIRE,FLYING,GHOST,GRASS,GROUND,ICE,NORMAL,POISON,PSYCHIC,ROCK,STEEL,WATER'.split(",")move_type_damage_multiplier_list = []# 如果存在第二个类型if type_2:# 计算每种类型对应的伤害倍率for type in TYPE_list:move_type_damage_multiplier_list.append(type_chart[type_1][type] * type_chart[type_2][type])move_type_damage_multiplier_dict = dict(zip(TYPE_list, move_type_damage_multiplier_list))else:move_type_damage_multiplier_dict = type_chart[type_1]effective_type_list = []extreme_type_list = []resistant_type_list = []extreme_resistant_type_list = []immune_type_list = []# 根据伤害倍率将类型分为不同的类别for type, value in move_type_damage_multiplier_dict.items():if value == 2:effective_type_list.append(type)elif value == 4:extreme_type_list.append(type)elif value == 1 / 2:resistant_type_list.append(type)elif value == 1 / 4:extreme_resistant_type_list.append(type)elif value == 0:immune_type_list.append(type)else:  # value == 1continue# 如果约束类型列表不为空if constraint_type_list:# 更新极端类型列表,取交集extreme_type_list = list(set(extreme_type_list).intersection(set(constraint_type_list)))# 更新有效类型列表,取交集effective_type_list = list(set(effective_type_list).intersection(set(constraint_type_list)))# 更新抗性类型列表,取交集resistant_type_list = list(set(resistant_type_list).intersection(set(constraint_type_list)))# 更新极端抗性类型列表,取交集extreme_resistant_type_list = list(set(extreme_resistant_type_list).intersection(set(constraint_type_list)))# 更新免疫类型列表,取交集immune_type_list = list(set(immune_type_list).intersection(set(constraint_type_list)))# 返回更新后的各类型列表return extreme_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list
# 定义一个函数,根据给定的参数计算并返回对应的移动类型伤害提示
def move_type_damage_wraper(pokemon_name, type_1, type_2, type_chart, constraint_type_list=None):# 初始化移动类型伤害提示字符串move_type_damage_prompt = ""# 调用函数计算移动类型伤害倍数,得到各种类型的列表extreme_effective_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list = calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list)# 如果存在有效的、抵抗的或免疫的类型列表if effective_type_list or resistant_type_list or immune_type_list:# 构建移动类型伤害提示字符串move_type_damage_prompt = f"{pokemon_name}"if extreme_effective_type_list:move_type_damage_prompt = move_type_damage_prompt + " can be super-effectively attacked by " + ", ".join(extreme_effective_type_list) + " moves"if effective_type_list:move_type_damage_prompt = move_type_damage_prompt + ", can be effectively attacked by " + ", ".join(effective_type_list) + " moves"if resistant_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is resistant to " + ", ".join(resistant_type_list) + " moves"if extreme_resistant_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is super-resistant to " + ", ".join(extreme_resistant_type_list) + " moves"if immune_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is immuned to " + ", ".join(immune_type_list) + " moves"# 返回移动类型伤害提示字符串return move_type_damage_prompt# 定义一个类,继承自Player类,实现最大基础伤害玩家
class MaxBasePowerPlayer(Player):# 重写choose_move方法def choose_move(self, battle: AbstractBattle):# 如果存在可用的移动if battle.available_moves:# 选择基础伤害最大的移动best_move = max(battle.available_moves, key=lambda move: move.base_power)return self.create_order(best_move)# 如果没有可用的移动,则随机选择一个移动return self.choose_random_move(battle)# 定义一个类,继承自Player类,实现简单启发式玩家
class SimpleHeuristicsPlayer(Player):# 定义了各种入场危害效果,将字符串映射到对应的SideCondition枚举值ENTRY_HAZARDS = {"spikes": SideCondition.SPIKES,"stealhrock": SideCondition.STEALTH_ROCK,"stickyweb": SideCondition.STICKY_WEB,"toxicspikes": SideCondition.TOXIC_SPIKES,}# 定义了反危害招式,使用集合存储ANTI_HAZARDS_MOVES = {"rapidspin", "defog"}# 定义了速度等级系数SPEED_TIER_COEFICIENT = 0.1# 定义了生命值分数系数HP_FRACTION_COEFICIENT = 0.4# 定义了交换出场匹配阈值SWITCH_OUT_MATCHUP_THRESHOLD = -2# 估算对战情况,返回得分def _estimate_matchup(self, mon: Pokemon, opponent: Pokemon):# 计算对手对我方造成的伤害倍率的最大值score = max([opponent.damage_multiplier(t) for t in mon.types if t is not None])# 减去我方对对手造成的伤害倍率的最大值score -= max([mon.damage_multiplier(t) for t in opponent.types if t is not None])# 根据速度等级差异调整得分if mon.base_stats["spe"] > opponent.base_stats["spe"]:score += self.SPEED_TIER_COEFICIENTelif opponent.base_stats["spe"] > mon.base_stats["spe"]:score -= self.SPEED_TIER_COEFICIENT# 根据生命值分数调整得分score += mon.current_hp_fraction * self.HP_FRACTION_COEFICIENTscore -= opponent.current_hp_fraction * self.HP_FRACTION_COEFICIENTreturn score# 判断是否应该使用极巨化def _should_dynamax(self, battle: AbstractBattle, n_remaining_mons: int):if battle.can_dynamax and self._dynamax_disable is False:# 最后一个满血的精灵if (len([m for m in battle.team.values() if m.current_hp_fraction == 1])== 1and battle.active_pokemon.current_hp_fraction == 1):return True# 有优势且双方都是满血if (self._estimate_matchup(battle.active_pokemon, battle.opponent_active_pokemon)> 0and battle.active_pokemon.current_hp_fraction == 1and battle.opponent_active_pokemon.current_hp_fraction == 1):return True# 只剩下一个精灵if n_remaining_mons == 1:return Truereturn False# 判断是否应该替换出当前精灵def _should_switch_out(self, battle: AbstractBattle):# 获取当前精灵和对手精灵active = battle.active_pokemonopponent = battle.opponent_active_pokemon# 如果有一个适合替换的精灵...if [mfor m in battle.available_switchesif self._estimate_matchup(m, opponent) > 0]:# ...并且有一个“好”的理由替换出去if active.boosts["def"] <= -3 or active.boosts["spd"] <= -3:return Trueif (active.boosts["atk"] <= -3and active.stats["atk"] >= active.stats["spa"]):return Trueif (active.boosts["spa"] <= -3and active.stats["atk"] <= active.stats["spa"]):return Trueif (self._estimate_matchup(active, opponent)< self.SWITCH_OUT_MATCHUP_THRESHOLD):return Truereturn False# 估算精灵的状态def _stat_estimation(self, mon: Pokemon, stat: str):# 计算状态提升值if mon.boosts[stat] > 1:boost = (2 + mon.boosts[stat]) / 2else:boost = 2 / (2 - mon.boosts[stat])return ((2 * mon.base_stats[stat] + 31) + 5) * boost# 计算奖励值def calc_reward(self, current_battle: AbstractBattle) -> float:# 计算奖励值return self.reward_computing_helper(current_battle, fainted_value=2.0, hp_value=1.0, victory_value=30.0)# 根据状态和等级返回加成倍数def boost_multiplier(self, state, level):# 如果状态是准确度if state == "accuracy":# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.33if level == 2:return 1.66if level == 3:return 2.0if level == 4:return 2.5if level == 5:return 2.66if level == 6:return 3.0if level == -1:return 0.75if level == -2:return 0.6if level == -3:return 0.5if level == -4:return 0.43if level == -5:return 0.36if level == -6:return 0.33# 如果状态不是准确度else:# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.5if level == 2:return 2.0if level == 3:return 2.5if level == 4:return 3.0if level == 5:return 3.5if level == 6:return 4.0if level == -1:return 0.67if level == -2:return 0.5if level == -3:return 0.4if level == -4:return 0.33if level == -5:return 0.29if level == -6:return 0.25# 检查给定状态的值,并返回相应的状态字符串def check_status(self, status):# 如果状态存在if status:# 根据状态值返回相应的状态字符串if status.value == 1:return "burnt"elif status.value == 2:return "fainted"elif status.value == 3:return "frozen"elif status.value == 4:return "paralyzed"elif status.value == 5:return "poisoned"elif status.value == 7:return "toxic"elif status.value == 6:return "sleeping"# 如果状态不存在,则返回"healthy"else:return "healthy"

.\PokeLLMon\poke_env\player\battle_order.py

# 从 dataclasses 模块中导入 dataclass 装饰器
# 从 typing 模块中导入 Any, List, Optional, Union 类型
# 从 poke_env.environment.double_battle 模块中导入 DoubleBattle 类
# 从 poke_env.environment.move 模块中导入 Move 类
# 从 poke_env.environment.pokemon 模块中导入 Pokemon 类
from dataclasses import dataclass
from typing import Any, List, Optional, Union# 定义一个名为 BattleOrder 的数据类
@dataclass
class BattleOrder:# order 属性可以是 Move 或 Pokemon 类型,初始值为 Noneorder: Optional[Union[Move, Pokemon]]# mega, z_move, dynamax, terastallize, move_target 属性的默认值mega: bool = Falsez_move: bool = Falsedynamax: bool = Falseterastallize: bool = Falsemove_target: int = DoubleBattle.EMPTY_TARGET_POSITION# 默认的指令字符串DEFAULT_ORDER = "/choose default"# 返回对象的字符串表示形式def __str__(self) -> str:return self.message# 返回消息字符串@propertydef message(self) -> str:# 如果 order 是 Move 类型if isinstance(self.order, Move):# 如果 order 的 id 是 "recharge"if self.order.id == "recharge":return "/choose move 1"# 构建消息字符串message = f"/choose move {self.order.id}"if self.mega:message += " mega"elif self.z_move:message += " zmove"elif self.dynamax:message += " dynamax"elif self.terastallize:message += " terastallize"# 如果 move_target 不是空目标位置if self.move_target != DoubleBattle.EMPTY_TARGET_POSITION:message += f" {self.move_target}"return message# 如果 order 是 Pokemon 类型elif isinstance(self.order, Pokemon):return f"/choose switch {self.order.species}"else:return ""# 定义一个名为 DefaultBattleOrder 的类,继承自 BattleOrder 类
class DefaultBattleOrder(BattleOrder):# 初始化方法,不执行任何操作def __init__(self, *args: Any, **kwargs: Any):pass# 返回默认指令字符串@propertydef message(self) -> str:return self.DEFAULT_ORDER# 定义一个名为 DoubleBattleOrder 的数据类,继承自 BattleOrder 类
@dataclass
class DoubleBattleOrder(BattleOrder):# 初始化方法,接受两个可选的 BattleOrder 参数def __init__(self,first_order: Optional[BattleOrder] = None,second_order: Optional[BattleOrder] = None,):self.first_order = first_orderself.second_order = second_order# 返回消息字符串@property# 返回合并后的消息字符串def message(self) -> str:# 如果存在第一和第二指令,则返回两者消息的组合if self.first_order and self.second_order:return (self.first_order.message+ ", "+ self.second_order.message.replace("/choose ", ""))# 如果只存在第一指令,则返回第一指令消息和默认消息的组合elif self.first_order:return self.first_order.message + ", default"# 如果只存在第二指令,则返回第二指令消息和默认消息的组合elif self.second_order:return self.second_order.message + ", default"# 如果都不存在指令,则返回默认指令消息else:return self.DEFAULT_ORDER# 静态方法,用于合并第一和第二指令列表生成双重战斗指令列表@staticmethoddef join_orders(first_orders: List[BattleOrder], second_orders: List[BattleOrder]):# 如果第一和第二指令列表都存在if first_orders and second_orders:# 生成双重战斗指令列表,排除特定条件下的指令orders = [DoubleBattleOrder(first_order=first_order, second_order=second_order)for first_order in first_ordersfor second_order in second_ordersif not first_order.mega or not second_order.megaif not first_order.z_move or not second_order.z_moveif not first_order.dynamax or not second_order.dynamaxif not first_order.terastallize or not second_order.terastallizeif first_order.order != second_order.order]# 如果生成了双重战斗指令列表,则返回该列表if orders:return orders# 如果只存在第一指令列表,则生成只包含第一指令的双重战斗指令列表elif first_orders:return [DoubleBattleOrder(first_order=order) for order in first_orders]# 如果只存在第二指令列表,则生成只包含第二指令的双重战斗指令列表elif second_orders:return [DoubleBattleOrder(first_order=order) for order in second_orders]# 如果两个指令列表都不存在,则返回只包含默认指令的双重战斗指令列表return [DefaultBattleOrder()]
# 定义一个名为ForfeitBattleOrder的类,继承自BattleOrder类
class ForfeitBattleOrder(BattleOrder):# 初始化方法,接受任意数量的位置参数和关键字参数def __init__(self, *args: Any, **kwargs: Any):# pass表示不做任何操作,保持方法的结构完整pass# 定义一个名为message的属性,返回字符串"/forfeit"@propertydef message(self) -> str:return "/forfeit"

.\PokeLLMon\poke_env\player\gpt_player.py

import json  # 导入 json 模块
import os  # 导入 os 模块
import random  # 导入 random 模块
from typing import List  # 导入 List 类型提示
from poke_env.environment.abstract_battle import AbstractBattle  # 导入 AbstractBattle 类
from poke_env.environment.double_battle import DoubleBattle  # 导入 DoubleBattle 类
from poke_env.environment.move_category import MoveCategory  # 导入 MoveCategory 类
from poke_env.environment.pokemon import Pokemon  # 导入 Pokemon 类
from poke_env.environment.side_condition import SideCondition  # 导入 SideCondition 类
from poke_env.player.player import Player, BattleOrder  # 导入 Player 和 BattleOrder 类
from typing import Dict, List, Optional, Union  # 导入 Dict, List, Optional, Union 类型提示
from poke_env.environment.move import Move  # 导入 Move 类
import time  # 导入 time 模块
import json  # 再次导入 json 模块(重复导入)
from openai import OpenAI  # 导入 OpenAI 类
from poke_env.data.gen_data import GenData  # 导入 GenData 类def calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list):TYPE_list = 'BUG,DARK,DRAGON,ELECTRIC,FAIRY,FIGHTING,FIRE,FLYING,GHOST,GRASS,GROUND,ICE,NORMAL,POISON,PSYCHIC,ROCK,STEEL,WATER'.split(",")move_type_damage_multiplier_list = []  # 初始化一个空列表,用于存储每种类型的伤害倍率if type_2:  # 如果存在第二种类型for type in TYPE_list:  # 遍历每种类型move_type_damage_multiplier_list.append(type_chart[type_1][type] * type_chart[type_2][type])  # 计算两种类型之间的伤害倍率并添加到列表中move_type_damage_multiplier_dict = dict(zip(TYPE_list, move_type_damage_multiplier_list))  # 将类型和对应的伤害倍率组成字典else:  # 如果只有一种类型move_type_damage_multiplier_dict = type_chart[type_1]  # 直接使用第一种类型的伤害倍率字典effective_type_list = []  # 初始化有效类型列表extreme_type_list = []  # 初始化极效类型列表resistant_type_list = []  # 初始化抵抗类型列表extreme_resistant_type_list = []  # 初始化极度抵抗类型列表immune_type_list = []  # 初始化免疫类型列表for type, value in move_type_damage_multiplier_dict.items():  # 遍历每种类型及其对应的伤害倍率if value == 2:  # 如果伤害倍率为 2effective_type_list.append(type)  # 添加到有效类型列表elif value == 4:  # 如果伤害倍率为 4extreme_type_list.append(type)  # 添加到极效类型列表elif value == 1 / 2:  # 如果伤害倍率为 1/2resistant_type_list.append(type)  # 添加到抵抗类型列表elif value == 1 / 4:  # 如果伤害倍率为 1/4extreme_resistant_type_list.append(type)  # 添加到极度抵抗类型列表elif value == 0:  # 如果伤害倍率为 0immune_type_list.append(type)  # 添加到免疫类型列表else:  # 如果伤害倍率为 1continue  # 继续循环# 如果约束类型列表不为空if constraint_type_list:# 将极端类型列表与约束类型列表的交集作为新的极端类型列表extreme_type_list = list(set(extreme_type_list).intersection(set(constraint_type_list)))# 将有效类型列表与约束类型列表的交集作为新的有效类型列表effective_type_list = list(set(effective_type_list).intersection(set(constraint_type_list)))# 将抗性类型列表与约束类型列表的交集作为新的抗性类型列表resistant_type_list = list(set(resistant_type_list).intersection(set(constraint_type_list)))# 将极端抗性类型列表与约束类型列表的交集作为新的极端抗性类型列表extreme_resistant_type_list = list(set(extreme_resistant_type_list).intersection(set(constraint_type_list)))# 将免疫类型列表与约束类型列表的交集作为新的免疫类型列表immune_type_list = list(set(immune_type_list).intersection(set(constraint_type_list)))# 返回各类型列表的首字母大写形式return (list(map(lambda x: x.capitalize(), extreme_type_list)),list(map(lambda x: x.capitalize(), effective_type_list)),list(map(lambda x: x.capitalize(), resistant_type_list)),list(map(lambda x: x.capitalize(), extreme_resistant_type_list)),list(map(lambda x: x.capitalize(), immune_type_list)))
# 定义一个函数,用于计算给定精灵对应的移动类型伤害提示
def move_type_damage_wraper(pokemon, type_chart, constraint_type_list=None):# 初始化变量,用于存储精灵的两种类型type_1 = Nonetype_2 = None# 如果精灵有第一种类型if pokemon.type_1:# 获取第一种类型的名称type_1 = pokemon.type_1.name# 如果精灵有第二种类型if pokemon.type_2:# 获取第二种类型的名称type_2 = pokemon.type_2.name# 初始化移动类型伤害提示字符串move_type_damage_prompt = ""# 调用函数计算移动类型伤害倍数,得到不同类型的列表extreme_effective_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list = calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list)# 根据不同类型的列表生成移动类型伤害提示if extreme_effective_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(extreme_effective_type_list) +f"-type attack is extremely-effective (4x damage) to {pokemon.species}.")if effective_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(effective_type_list) +f"-type attack is super-effective (2x damage) to {pokemon.species}.")if resistant_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(resistant_type_list) +f"-type attack is ineffective (0.5x damage) to {pokemon.species}.")if extreme_resistant_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(extreme_resistant_type_list) +f"-type attack is highly ineffective (0.25x damage) to {pokemon.species}.")if immune_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(immune_type_list) +f"-type attack is zero effect (0x damage) to {pokemon.species}.")# 返回移动类型伤害提示字符串return move_type_damage_prompt# 定义一个类,继承自Player类
class LLMPlayer(Player):# 使用 OpenAI API 进行对话生成,返回生成的文本def chatgpt(self, system_prompt, user_prompt, model, temperature=0.7, json_format=False, seed=None, stop=[], max_tokens=200) -> str:# 创建 OpenAI 客户端对象client = OpenAI(api_key=self.api_key)# 如果需要返回 JSON 格式的响应if json_format:# 调用 API 完成对话生成,返回 JSON 格式的响应response = client.chat.completions.create(response_format={"type": "json_object"},model=model,messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=temperature,stream=False,# seed=seed,stop=stop,max_tokens=max_tokens)else:# 调用 API 完成对话生成response = client.chat.completions.create(model=model,messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=temperature,stream=False,# seed=seed,max_tokens=max_tokens,stop=stop)# 获取生成的文本内容outputs = response.choices[0].message.content# 记录完成的 token 数量self.completion_tokens += response.usage.completion_tokens# 记录 prompt 的 token 数量self.prompt_tokens += response.usage.prompt_tokens# 返回生成的文本return outputs# 估算两只精灵之间的对战得分def _estimate_matchup(self, mon: Pokemon, opponent: Pokemon):# 计算对手对该精灵造成的伤害加成中的最大值score = max([opponent.damage_multiplier(t) for t in mon.types if t is not None])# 计算该精灵对对手造成的伤害加成中的最大值score -= max([mon.damage_multiplier(t) for t in opponent.types if t is not None])# 根据速度判断得分if mon.base_stats["spe"] > opponent.base_stats["spe"]:score += self.SPEED_TIER_COEFICIENTelif opponent.base_stats["spe"] > mon.base_stats["spe"]:score -= self.SPEED_TIER_COEFICIENT# 根据当前生命值比例调整得分score += mon.current_hp_fraction * self.HP_FRACTION_COEFICIENTscore -= opponent.current_hp_fraction * self.HP_FRACTION_COEFICIENTreturn score# 判断是否应该使用极巨化def _should_dynamax(self, battle: AbstractBattle):# 统计队伍中剩余未倒下的精灵数量n_remaining_mons = len([m for m in battle.team.values() if m.fainted is False])if battle.can_dynamax and self._dynamax_disable is False:# 如果只剩下一只全血的精灵if (len([m for m in battle.team.values() if m.current_hp_fraction == 1])== 1and battle.active_pokemon.current_hp_fraction == 1):return True# 如果有对战优势且双方都是全血状态if (self._estimate_matchup(battle.active_pokemon, battle.opponent_active_pokemon)> 0and battle.active_pokemon.current_hp_fraction == 1and battle.opponent_active_pokemon.current_hp_fraction == 1):return True# 如果只剩下一只精灵if n_remaining_mons == 1:return Truereturn False# 解析LLM输出,找到JSON内容的起始位置json_start = llm_output.find('{')# 找到JSON内容的结束位置,从后往前找第一个}json_end = llm_output.rfind('}') + 1# 提取JSON内容json_content = llm_output[json_start:json_end]# 将JSON内容加载为Python对象llm_action_json = json.loads(json_content)# 初始化下一个动作为Nonenext_action = None# 如果JSON中包含"move"字段if "move" in llm_action_json.keys():# 获取LLM中的移动ID并处理格式llm_move_id = llm_action_json["move"]llm_move_id = llm_move_id.replace(" ","").replace("-", "")# 遍历可用的移动列表,匹配LLM中的移动IDfor i, move in enumerate(battle.available_moves):if move.id.lower() == llm_move_id.lower():# 创建相应的移动指令next_action = self.create_order(move, dynamax=self._should_dynamax(battle))# 如果JSON中包含"switch"字段elif "switch" in llm_action_json.keys():# 获取LLM中的交换精灵种类并匹配可用的交换精灵列表llm_switch_species = llm_action_json["switch"]for i, pokemon in enumerate(battle.available_switches):if pokemon.species.lower() == llm_switch_species.lower():# 创建相应的交换指令next_action = self.create_order(pokemon)# 如果下一个动作仍为None,则抛出数值错误异常if next_action is None:raise ValueError("Value Error")# 返回下一个动作return next_action# 解析LLM输出,找到JSON内容的起始位置json_start = llm_output.find('{')# 找到JSON内容的结束位置,从后往前找第一个}json_end = llm_output.rfind('}') + 1# 提取JSON内容json_content = llm_output[json_start:json_end]# 将JSON内容转换为Python对象llm_action_json = json.loads(json_content)next_action = None# 获取动作和目标action = llm_action_json["decision"]["action"]target = llm_action_json["decision"]["target"]# 处理目标字符串,去除空格和下划线target = target.replace(" ", "").replace("_", "")# 如果动作是移动if action.lower() == "move":# 遍历可用的移动for i, move in enumerate(battle.available_moves):# 如果移动ID匹配目标if move.id.lower() == target.lower():# 创建移动指令next_action = self.create_order(move, dynamax=self._should_dynamax(battle))# 如果动作是交换elif action.lower() == "switch":# 遍历可用的交换精灵for i, pokemon in enumerate(battle.available_switches):# 如果精灵种类匹配目标if pokemon.species.lower() == target.lower():# 创建交换指令next_action = self.create_order(pokemon)# 如果没有找到下一步动作,抛出数值错误if next_action is None:raise ValueError("Value Error")# 返回下一步动作return next_action# 检查状态并返回对应的字符串def check_status(self, status):if status:if status.value == 1:return "burnt"elif status.value == 2:return "fainted"elif status.value == 3:return "frozen"elif status.value == 4:return "paralyzed"elif status.value == 5:return "poisoned"elif status.value == 7:return "toxic"elif status.value == 6:return "sleeping"else:return ""# 根据状态和等级返回加成倍数def boost_multiplier(self, state, level):# 如果状态是准确度if state == "accuracy":# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.33if level == 2:return 1.66if level == 3:return 2.0if level == 4:return 2.5if level == 5:return 2.66if level == 6:return 3.0if level == -1:return 0.75if level == -2:return 0.6if level == -3:return 0.5if level == -4:return 0.43if level == -5:return 0.36if level == -6:return 0.33# 如果状态不是准确度else:# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.5if level == 2:return 2.0if level == 3:return 2.5if level == 4:return 3.0if level == 5:return 3.5if level == 6:return 4.0if level == -1:return 0.67if level == -2:return 0.5if level == -3:return 0.4if level == -4:return 0.33if level == -5:return 0.29if level == -6:return 0.25# 返回战斗摘要信息,包括击败得分、剩余得分、胜利列表和标签列表def battle_summary(self):# 初始化空列表用于存储击败得分、剩余得分、胜利列表和标签列表beat_list = []remain_list = []win_list = []tag_list = []# 遍历每场战斗,计算击败得分、剩余得分、是否胜利以及标签for tag, battle in self.battles.items():beat_score = 0# 计算对手队伍的击败得分for mon in battle.opponent_team.values():beat_score += (1-mon.current_hp_fraction)beat_list.append(beat_score)remain_score = 0# 计算己方队伍的剩余得分for mon in battle.team.values():remain_score += mon.current_hp_fractionremain_list.append(remain_score)# 如果战斗胜利,则在胜利列表中添加1if battle.won:win_list.append(1)tag_list.append(tag)# 返回击败得分列表、剩余得分列表、胜利列表和标签列表return beat_list, remain_list, win_list, tag_list# 辅助计算奖励值的函数def reward_computing_helper(self,battle: AbstractBattle,*,fainted_value: float = 0.0,hp_value: float = 0.0,number_of_pokemons: int = 6,starting_value: float = 0.0,status_value: float = 0.0,victory_value: float = 1.0,) -> float:"""A helper function to compute rewards."""# 如果战斗不在奖励缓冲区中,则将其添加,并设置初始值if battle not in self._reward_buffer:self._reward_buffer[battle] = starting_valuecurrent_value = 0# 遍历我方队伍中的每只精灵for mon in battle.team.values():# 根据当前生命值比例计算当前值current_value += mon.current_hp_fraction * hp_value# 如果精灵已经倒下,则减去倒下值if mon.fainted:current_value -= fainted_value# 如果精灵有异常状态,则减去异常状态值elif mon.status is not None:current_value -= status_value# 根据己方队伍中精灵数量与总精灵数量的差值计算当前值current_value += (number_of_pokemons - len(battle.team)) * hp_value# 遍历对方队伍中的每只精灵for mon in battle.opponent_team.values():# 根据当前生命值比例计算当前值current_value -= mon.current_hp_fraction * hp_value# 如果精灵已经倒下,则加上倒下值if mon.fainted:current_value += fainted_value# 如果精灵有异常状态,则加上异常状态值elif mon.status is not None:current_value += status_value# 根据对方队伍中精灵数量与总精灵数量的差值计算当前值current_value -= (number_of_pokemons - len(battle.opponent_team)) * hp_value# 如果战斗胜利,则加上胜利值if battle.won:current_value += victory_value# 如果战斗失败,则减去胜利值elif battle.lost:current_value -= victory_value# 计算当前值与奖励缓冲区中的值的差值作为返回值to_return = current_value - self._reward_buffer[battle] # the return value is the deltaself._reward_buffer[battle] = current_valuereturn to_returndef choose_max_damage_move(self, battle: AbstractBattle):# 如果有可用的招式,则选择基础威力最大的招式if battle.available_moves:best_move = max(battle.available_moves, key=lambda move: move.base_power)return self.create_order(best_move)# 如果没有可用的招式,则随机选择一个招式return self.choose_random_move(battle)

.\PokeLLMon\poke_env\player\llama_player.py

# 导入所需的模块
from poke_env.player.gpt_player import LLMPlayer
from poke_env.environment.abstract_battle import AbstractBattle
import json
from peft import PeftModel
import transformers
import torch
from poke_env.player.player import BattleOrder# 设置空字符串作为默认的令牌
my_token = ""
# 定义忽略索引
IGNORE_INDEX = -100
# 定义默认的填充令牌
DEFAULT_PAD_TOKEN = "[PAD]"
# 定义默认的结束令牌
DEFAULT_EOS_TOKEN = "</s>"
# 定义默认的开始令牌
DEFAULT_BOS_TOKEN = "<s>"
# 定义默认的未知令牌
DEFAULT_UNK_TOKEN = "<unk>"# 定义 LLAMAPlayer 类,继承自 LLMPlayer
class LLAMAPlayer(LLMPlayer):# 初始化函数,接受多个参数def __init__(self, battle_format,model_name_or_path: str = "",# tokenizer_path: str = "",lora_weights: str = "",model_max_length: int = 2048,w_reason = False,log_dir = "",account_configuration=None,server_configuration=None,):# 调用父类的初始化函数super().__init__(battle_format=battle_format,account_configuration=account_configuration,server_configuration=server_configuration)# 初始化 LLAMA 模型# 加载 LLAMA 模型self.except_cnt = 0self.total_cnt = 0self.log_dir = log_dirself.w_reason = w_reasonself.last_output = Noneself.last_state_prompt = None# 断言确保模型路径已指定assert (model_name_or_path), "Please specify the model path"# 使用指定的模型路径加载 tokenizerself.tokenizer = transformers.AutoTokenizer.from_pretrained(model_name_or_path,model_max_length=model_max_length,padding_side="right",use_fast=False,use_auth_token=my_token)# 使用指定的模型路径加载模型self.model = transformers.AutoModelForCausalLM.from_pretrained(model_name_or_path,load_in_8bit=False,torch_dtype=torch.float16,device_map="auto",use_auth_token=my_token)# 如果有 LoRA 权重,则加载if lora_weights:print("Recover LoRA weights..")self.model = PeftModel.from_pretrained(self.model,lora_weights,torch_dtype=torch.float16,)# 输出加载完成信息print("Loading finished...")# 设置模型为评估模式self.model.eval()

.\PokeLLMon\poke_env\player\openai_api.py

"""This module defines a player class with the OpenAI API on the main thread.
For a black-box implementation consider using the module env_player.
"""
# 导入必要的模块
from __future__ import annotationsimport asyncio
import copy
import random
import time
from abc import ABC, abstractmethod
from logging import Logger
from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, Tuple, Union# 导入自定义模块
from gymnasium.core import ActType, Env, ObsType
from gymnasium.spaces import Discrete, Space# 导入自定义模块
from poke_env.concurrency import POKE_LOOP, create_in_poke_loop
from poke_env.environment.abstract_battle import AbstractBattle
from poke_env.player.battle_order import BattleOrder, ForfeitBattleOrder
from poke_env.player.player import Player
from poke_env.ps_client import AccountConfiguration
from poke_env.ps_client.server_configuration import (LocalhostServerConfiguration,ServerConfiguration,
)
from poke_env.teambuilder.teambuilder import Teambuilder# 定义一个异步队列类
class _AsyncQueue:def __init__(self, queue: asyncio.Queue[Any]):self.queue = queue# 异步获取队列中的元素async def async_get(self):return await self.queue.get()# 获取队列中的元素def get(self):res = asyncio.run_coroutine_threadsafe(self.queue.get(), POKE_LOOP)return res.result()# 异步向队列中放入元素async def async_put(self, item: Any):await self.queue.put(item)# 向队列中放入元素def put(self, item: Any):task = asyncio.run_coroutine_threadsafe(self.queue.put(item), POKE_LOOP)task.result()# 判断队列是否为空def empty(self):return self.queue.empty()# 阻塞直到队列中的所有元素都被处理def join(self):task = asyncio.run_coroutine_threadsafe(self.queue.join(), POKE_LOOP)task.result()# 异步等待队列中的所有元素都被处理async def async_join(self):await self.queue.join()# 定义一个异步玩家类
class _AsyncPlayer(Generic[ObsType, ActType], Player):actions: _AsyncQueueobservations: _AsyncQueuedef __init__(self,user_funcs: OpenAIGymEnv[ObsType, ActType],username: str,**kwargs: Any,# 定义一个类,继承自AsyncPlayer类):# 设置类名为usernameself.__class__.__name__ = username# 调用父类的初始化方法super().__init__(**kwargs)# 设置类名为"_AsyncPlayer"self.__class__.__name__ = "_AsyncPlayer"# 初始化observations为一个异步队列self.observations = _AsyncQueue(create_in_poke_loop(asyncio.Queue, 1))# 初始化actions为一个异步队列self.actions = _AsyncQueue(create_in_poke_loop(asyncio.Queue, 1))# 初始化current_battle为Noneself.current_battle: Optional[AbstractBattle] = None# 初始化_user_funcs为user_funcs# 定义一个方法,用于选择移动def choose_move(self, battle: AbstractBattle) -> Awaitable[BattleOrder]:# 返回_env_move方法的结果return self._env_move(battle)# 定义一个异步方法,用于处理环境移动async def _env_move(self, battle: AbstractBattle) -> BattleOrder:# 如果当前战斗为空或已结束,则将当前战斗设置为传入的战斗if not self.current_battle or self.current_battle.finished:self.current_battle = battle# 如果当前战斗不等于传入的战斗,则抛出异常if not self.current_battle == battle:raise RuntimeError("Using different battles for queues")# 将战斗嵌入到用户函数中,并异步放入observations队列中battle_to_send = self._user_funcs.embed_battle(battle)await self.observations.async_put(battle_to_send)# 从actions队列中异步获取动作action = await self.actions.async_get()# 如果动作为-1,则返回放弃战斗的指令if action == -1:return ForfeitBattleOrder()# 将动作转换为移动指令并返回return self._user_funcs.action_to_move(action, battle)# 定义一个回调方法,用于处理战斗结束时的操作def _battle_finished_callback(self, battle: AbstractBattle):# 将战斗嵌入到用户函数中,并异步放入observations队列中to_put = self._user_funcs.embed_battle(battle)# 在POKE_LOOP中安全地运行异步放入操作asyncio.run_coroutine_threadsafe(self.observations.async_put(to_put), POKE_LOOP)
# 定义一个元类,继承自 ABC 类型
class _ABCMetaclass(type(ABC)):pass# 定义一个元类,继承自 Env 类型
class _EnvMetaclass(type(Env)):pass# 定义一个元类,继承自 _EnvMetaclass 和 _ABCMetaclass
class _OpenAIGymEnvMetaclass(_EnvMetaclass, _ABCMetaclass):pass# 定义一个类 OpenAIGymEnv,继承自 Env[ObsType, ActType] 和 ABC 类型,使用 _OpenAIGymEnvMetaclass 元类
class OpenAIGymEnv(Env[ObsType, ActType],ABC,metaclass=_OpenAIGymEnvMetaclass,
):"""Base class implementing the OpenAI Gym API on the main thread."""# 初始化重试次数_INIT_RETRIES = 100# 重试之间的时间间隔_TIME_BETWEEN_RETRIES = 0.5# 切换挑战任务的重试次数_SWITCH_CHALLENGE_TASK_RETRIES = 30# 切换重试之间的时间间隔_TIME_BETWEEN_SWITCH_RETIRES = 1# 初始化方法def __init__(self,account_configuration: Optional[AccountConfiguration] = None,*,avatar: Optional[int] = None,battle_format: str = "gen8randombattle",log_level: Optional[int] = None,save_replays: Union[bool, str] = False,server_configuration: Optional[ServerConfiguration] = LocalhostServerConfiguration,start_timer_on_battle_start: bool = False,start_listening: bool = True,ping_interval: Optional[float] = 20.0,ping_timeout: Optional[float] = 20.0,team: Optional[Union[str, Teambuilder]] = None,start_challenging: bool = False,# 抽象方法,计算奖励@abstractmethoddef calc_reward(self, last_battle: AbstractBattle, current_battle: AbstractBattle) -> float:"""Returns the reward for the current battle state. The battle state in the previousturn is given as well and can be used for comparisons.:param last_battle: The battle state in the previous turn.:type last_battle: AbstractBattle:param current_battle: The current battle state.:type current_battle: AbstractBattle:return: The reward for current_battle.:rtype: float"""pass# 抽象方法@abstractmethod# 根据给定的动作和当前战斗状态返回相应的战斗指令def action_to_move(self, action: int, battle: AbstractBattle) -> BattleOrder:"""Returns the BattleOrder relative to the given action.:param action: The action to take.:type action: int:param battle: The current battle state:type battle: AbstractBattle:return: The battle order for the given action in context of the current battle.:rtype: BattleOrder"""pass# 返回当前战斗状态的嵌入,格式与OpenAI gym API兼容@abstractmethoddef embed_battle(self, battle: AbstractBattle) -> ObsType:"""Returns the embedding of the current battle state in a format compatible withthe OpenAI gym API.:param battle: The current battle state.:type battle: AbstractBattle:return: The embedding of the current battle state."""pass# 返回嵌入的描述,必须返回一个指定了下限和上限的Space@abstractmethoddef describe_embedding(self) -> Space[ObsType]:"""Returns the description of the embedding. It must return a Space specifyinglow bounds and high bounds.:return: The description of the embedding.:rtype: Space"""pass# 返回动作空间的大小,如果大小为x,则动作空间从0到x-1@abstractmethoddef action_space_size(self) -> int:"""Returns the size of the action space. Given size x, the action space goesfrom 0 to x - 1.:return: The action space size.:rtype: int"""pass# 返回将在挑战循环的下一次迭代中挑战的对手(或对手列表)# 如果返回一个列表,则在挑战循环期间将随机选择一个元素@abstractmethoddef get_opponent(self,) -> Union[Player, str, List[Player], List[str]]:"""Returns the opponent (or list of opponents) that will be challengedon the next iteration of the challenge loop. If a list is returned,a random element will be chosen at random during the challenge loop.:return: The opponent (or list of opponents).:rtype: Player or str or list(Player) or list(str)"""pass# 获取对手玩家或字符串def _get_opponent(self) -> Union[Player, str]:# 获取对手opponent = self.get_opponent()# 如果对手是列表,则随机选择一个对手,否则直接返回对手random_opponent = (random.choice(opponent) if isinstance(opponent, list) else opponent)return random_opponent# 重置环境def reset(self,*,seed: Optional[int] = None,options: Optional[Dict[str, Any]] = None,) -> Tuple[ObsType, Dict[str, Any]]:# 如果有种子值,则使用种子值重置环境if seed is not None:super().reset(seed=seed)  # type: ignoreself._seed_initialized = True# 如果种子值未初始化,则使用当前时间戳作为种子值elif not self._seed_initialized:super().reset(seed=int(time.time()))  # type: ignoreself._seed_initialized = True# 如果当前没有对战,则等待对战开始if not self.agent.current_battle:count = self._INIT_RETRIESwhile not self.agent.current_battle:if count == 0:raise RuntimeError("Agent is not challenging")count -= 1time.sleep(self._TIME_BETWEEN_RETRIES)# 如果当前对战未结束,则等待对战结束if self.current_battle and not self.current_battle.finished:if self.current_battle == self.agent.current_battle:self._actions.put(-1)self._observations.get()else:raise RuntimeError("Environment and agent aren't synchronized. Try to restart")# 等待当前对战与对手对战不同while self.current_battle == self.agent.current_battle:time.sleep(0.01)# 更新当前对战为对手对战self.current_battle = self.agent.current_battlebattle = copy.copy(self.current_battle)battle.logger = Noneself.last_battle = copy.deepcopy(battle)return self._observations.get(), self.get_additional_info()# 获取额外信息def get_additional_info(self) -> Dict[str, Any]:"""Returns additional info for the reset method.Override only if you really need it.:return: Additional information as a Dict:rtype: Dict"""return {}def step(self, action: ActType) -> Tuple[ObsType, float, bool, bool, Dict[str, Any]]:"""Execute the specified action in the environment.:param ActType action: The action to be executed.:return: A tuple containing the new observation, reward, termination flag, truncation flag, and info dictionary.:rtype: Tuple[ObsType, float, bool, bool, Dict[str, Any]]"""# 如果当前战斗为空,则重置环境并返回初始观察和信息if not self.current_battle:obs, info = self.reset()return obs, 0.0, False, False, info# 如果当前战斗已经结束,则抛出异常if self.current_battle.finished:raise RuntimeError("Battle is already finished, call reset")# 复制当前战斗对象,以便进行操作battle = copy.copy(self.current_battle)battle.logger = None# 深度复制当前战斗对象,用于记录上一次的战斗状态self.last_battle = copy.deepcopy(battle)# 将动作放入动作队列self._actions.put(action)# 从观察队列中获取观察结果observation = self._observations.get()# 计算奖励reward = self.calc_reward(self.last_battle, self.current_battle)terminated = Falsetruncated = False# 如果当前战斗已经结束if self.current_battle.finished:size = self.current_battle.team_size# 计算剩余队伍中未被击倒的精灵数量remaining_mons = size - len([mon for mon in self.current_battle.team.values() if mon.fainted])remaining_opponent_mons = size - len([monfor mon in self.current_battle.opponent_team.values()if mon.fainted])# 如果一方队伍的精灵全部被击倒,则游戏结束if (remaining_mons == 0) != (remaining_opponent_mons == 0):terminated = Trueelse:truncated = True# 返回观察结果、奖励、游戏是否结束、游戏是否截断以及额外信息return observation, reward, terminated, truncated, self.get_additional_info()# 渲染当前战斗状态,显示当前回合信息和双方精灵状态def render(self, mode: str = "human"):# 如果当前存在战斗if self.current_battle is not None:# 打印当前回合信息和双方精灵状态print("  Turn %4d. | [%s][%3d/%3dhp] %10.10s - %10.10s [%3d%%hp][%s]"% (self.current_battle.turn,"".join(["⦻" if mon.fainted else "●"for mon in self.current_battle.team.values()]),self.current_battle.active_pokemon.current_hp or 0,self.current_battle.active_pokemon.max_hp or 0,self.current_battle.active_pokemon.species,self.current_battle.opponent_active_pokemon.species,self.current_battle.opponent_active_pokemon.current_hp or 0,"".join(["⦻" if mon.fainted else "●"for mon in self.current_battle.opponent_team.values()]),),end="\n" if self.current_battle.finished else "\r",)# 关闭当前战斗,清理资源def close(self, purge: bool = True):# 如果当前没有战斗或者当前战斗已结束if self.current_battle is None or self.current_battle.finished:# 等待1秒time.sleep(1)# 如果当前战斗不是代理的当前战斗if self.current_battle != self.agent.current_battle:self.current_battle = self.agent.current_battle# 创建一个异步任务来停止挑战循环closing_task = asyncio.run_coroutine_threadsafe(self._stop_challenge_loop(purge=purge), POKE_LOOP)# 获取异步任务的结果closing_task.result()def background_send_challenge(self, username: str):"""Sends a single challenge to a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task and not self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行发送挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.send_challenges(username, 1), POKE_LOOP)def background_accept_challenge(self, username: str):"""Accepts a single challenge from a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task and not self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行接受挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.accept_challenges(username, 1, self.agent.next_team), POKE_LOOP)async def _challenge_loop(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,# 如果没有指定挑战次数,则持续挑战直到 self._keep_challenging 为 False):# 如果没有挑战次数且 self._keep_challenging 为 Trueif not n_challenges:# 持续挑战直到 self._keep_challenging 为 Falsewhile self._keep_challenging:# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型if isinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent, 1)else:# 发送挑战请求await self.agent.send_challenges(opponent, 1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle is not None:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果指定了挑战次数且挑战次数大于 0elif n_challenges > 0:# 循环指定次数for _ in range(n_challenges):# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型if isinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent, 1)else:# 发送挑战请求await self.agent.send_challenges(opponent, 1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle is not None:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果挑战次数小于等于 0else:# 抛出数值错误异常raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")# 开始挑战def start_challenging(# 指定挑战次数,默认为 Noneself,n_challenges: Optional[int] = None,# 回调函数,接受 AbstractBattle 类型参数并返回 Nonecallback: Optional[Callable[[AbstractBattle], None]] = None,):"""Starts the challenge loop.:param n_challenges: The number of challenges to send. If empty it will run untilstopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with a copy ofthe final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task and not self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhile not self._challenge_task.done():if count == 0:raise RuntimeError("Agent is already challenging")count -= 1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战if not n_challenges:self._keep_challenging = True# 启动挑战循环任务self._challenge_task = asyncio.run_coroutine_threadsafe(self._challenge_loop(n_challenges, callback), POKE_LOOP)async def _ladder_loop(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,):# 如果指定了挑战次数,则进行相应次数的挑战if n_challenges:if n_challenges <= 0:raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")for _ in range(n_challenges):await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle is not None:callback(copy.deepcopy(self.current_battle))# 如果未指定挑战次数,则持续挑战直到停止else:while self._keep_challenging:await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle is not None:callback(copy.deepcopy(self.current_battle))# 启动 ladder 循环挑战def start_laddering(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,):"""Starts the laddering loop.:param n_challenges: The number of ladder games to play. If empty itwill run until stopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with acopy of the final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task and not self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhile not self._challenge_task.done():if count == 0:raise RuntimeError("Agent is already challenging")count -= 1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战if not n_challenges:self._keep_challenging = True# 使用 asyncio 在另一个线程中运行 _ladder_loop 方法,传入挑战次数和回调函数self._challenge_task = asyncio.run_coroutine_threadsafe(self._ladder_loop(n_challenges, callback), POKE_LOOP)async def _stop_challenge_loop(self, force: bool = True, wait: bool = True, purge: bool = False):  # 定义一个方法,接受多个参数self._keep_challenging = False  # 将属性_keep_challenging设置为Falseif force:  # 如果force为真if self.current_battle and not self.current_battle.finished:  # 如果存在当前战斗且未结束if not self._actions.empty():  # 如果_actions队列不为空await asyncio.sleep(2)  # 异步等待2秒if not self._actions.empty():  # 如果_actions队列仍不为空raise RuntimeError(  # 抛出运行时错误"The agent is still sending actions. ""Use this method only when training or ""evaluation are over.")if not self._observations.empty():  # 如果_observations队列不为空await self._observations.async_get()  # 异步获取_observations队列中的数据await self._actions.async_put(-1)  # 异步将-1放入_actions队列中if wait and self._challenge_task:  # 如果wait为真且_challenge_task存在while not self._challenge_task.done():  # 当_challenge_task未完成时await asyncio.sleep(1)  # 异步等待1秒self._challenge_task.result()  # 获取_challenge_task的结果self._challenge_task = None  # 将_challenge_task设置为Noneself.current_battle = None  # 将current_battle设置为Noneself.agent.current_battle = None  # 将agent的current_battle设置为Nonewhile not self._actions.empty():  # 当_actions队列不为空时await self._actions.async_get()  # 异步获取_actions队列中的数据while not self._observations.empty():  # 当_observations队列不为空时await self._observations.async_get()  # 异步获取_observations队列中的数据if purge:  # 如果purge为真self.agent.reset_battles()  # 调用agent的reset_battles方法def reset_battles(self):  # 定义一个方法reset_battles"""Resets the player's inner battle tracker."""  # 重置玩家的内部战斗追踪器self.agent.reset_battles()  # 调用agent的reset_battles方法# 检查任务是否完成,可设置超时时间def done(self, timeout: Optional[int] = None) -> bool:"""Returns True if the task is done or is done after the timeout, false otherwise.:param timeout: The amount of time to wait for if the task is not already done.If empty it will wait until the task is done.:type timeout: int, optional:return: True if the task is done or if the task gets completed after thetimeout.:rtype: bool"""# 如果挑战任务为空,则返回Trueif self._challenge_task is None:return True# 如果超时时间为空,则等待任务完成if timeout is None:self._challenge_task.result()return True# 如果挑战任务已完成,则返回Trueif self._challenge_task.done():return True# 等待一段时间后再次检查任务是否完成time.sleep(timeout)return self._challenge_task.done()# 暴露Player类的属性@propertydef battles(self) -> Dict[str, AbstractBattle]:return self.agent.battles@propertydef format(self) -> str:return self.agent.format@propertydef format_is_doubles(self) -> bool:return self.agent.format_is_doubles@propertydef n_finished_battles(self) -> int:return self.agent.n_finished_battles@propertydef n_lost_battles(self) -> int:return self.agent.n_lost_battles@propertydef n_tied_battles(self) -> int:return self.agent.n_tied_battles@propertydef n_won_battles(self) -> int:return self.agent.n_won_battles@propertydef win_rate(self) -> float:return self.agent.win_rate# 暴露Player Network Interface Class的属性@propertydef logged_in(self) -> asyncio.Event:"""Event object associated with user login.:return: The logged-in event:rtype: Event"""return self.agent.ps_client.logged_in@property# 返回与玩家相关联的日志记录器def logger(self) -> Logger:"""Logger associated with the player.:return: The logger.:rtype: Logger"""return self.agent.logger# 返回玩家的用户名@propertydef username(self) -> str:"""The player's username.:return: The player's username.:rtype: str"""return self.agent.username# 返回 WebSocket 的 URL@propertydef websocket_url(self) -> str:"""The websocket url.It is derived from the server url.:return: The websocket url.:rtype: str"""return self.agent.ps_client.websocket_url# 获取属性的值def __getattr__(self, item: str):return getattr(self.agent, item)

相关文章:

PokéLLMon 源码解析(四)

.\PokeLLMon\poke_env\exceptions.py """ This module contains exceptions. """# 定义一个自定义异常类 ShowdownException&#xff0c;继承自内置异常类 Exception class ShowdownException(Exception):"""This exception is …...

区块链基础知识01

区块链&#xff1a;区块链技术是一种高级数据库机制&#xff0c;允许在企业网络中透明地共享信息。区块链数据库将数据存储在区块中&#xff0c;而数据库则一起链接到一个链条中。数据在时间上是一致的&#xff0c;在没有网络共识的情况下&#xff0c;不能删除或修改链条。 即&…...

YOLOv9(2):YOLOv9网络结构

1. 前言 本文仅以官方提供的yolov9.yaml来进行简要讲解。 讲解之前&#xff0c;还是要做一些简单的铺垫。 Slice层不做任何的操作&#xff0c;纯粹是做一个占位层。这样一来&#xff0c;在parse_model时&#xff0c;ch[n]可表示第n层的输出通道。 Detect和DDetect主要区别还…...

提取b站字幕(视频字幕、AI字幕)

提取b站字幕&#xff08;视频字幕、AI字幕&#xff09; 1. 打开视频 2. 按 F12 进行开发者界面 视频自己的紫米输入的是 json&#xff0c;如果是AI字幕则需要输入 ai_subtitle 3. 进入这个网址&#xff1a;https://www.dreamlyn.cn/bsrt...

JAVA程序员如何快速熟悉新项目?

文章目录 Java程序员快速熟悉一个新项目的步骤通常包括以下几个方面&#xff1a;实例展示&#xff1a;Java程序员加入新项目时可能遇到的技术难题及其解决方案包括&#xff1a; Java程序员快速熟悉一个新项目的步骤通常包括以下几个方面&#xff1a; 理解项目背景和目标&#x…...

慢sql优化记录1

慢sql为&#xff1a; select count(*) from t_wf_process p left join t_wf_core_dofile dofile on p.wf_instance_uid dofile.instanceid join zwkj_department d on p.userdeptid d.department_guid ,t_wf_core_item i,wf_node n where (p.IS_DUPLICATE ! true or p.IS_DU…...

堆和堆排序

堆排序是一种与插入排序和并归排序十分不同的算法。 优先级队列 Priority Queue 优先级队列是类似于常规队列或堆栈数据结构的抽象数据类型&#xff08;ADT&#xff09;。优先级队列中的每个元素都有一个相关联的优先级key。在优先级队列中&#xff0c;高优先级的元素优先于…...

STM32 | 零基础 STM32 第一天

零基础 STM32 第一天 一、认知STM32 1、STM32概念 STM32:意法半导体基于ARM公司的Cortex-M内核开发的32位的高性能、低功耗单片机。 ST:意法半导体 M:基于ARM公司的Cortex-M内核的高性能、低功耗单片机 32&#xff1a;32位单片机 2、STM32开发的产品 STM32开发的产品&a…...

day16_购物车(添加购物车,购物车列表查询,删除购物车商品,更新选中商品状态,完成购物车商品的全选,清空购物车)

文章目录 购物车模块1 需求说明2 环境搭建3 添加购物车3.1 需求说明3.2 远程调用接口开发3.2.1 ProductController3.2.2 ProductService 3.3 openFeign接口定义3.3.1 环境搭建3.3.2 接口定义3.3.3 降级类定义 3.4 业务后端接口开发3.4.1 添加依赖3.4.2 修改启动类3.4.3 CartInf…...

基于Spring Boot的图书个性化推荐系统 ,计算机毕业设计(带源码+论文)

源码获取地址&#xff1a; 码呢-一个专注于技术分享的博客平台一个专注于技术分享的博客平台,大家以共同学习,乐于分享,拥抱开源的价值观进行学习交流http://www.xmbiao.cn/resource-details/1765769136268455938...

libevent源码解析:定时器事件(三)

文章目录 前言一、用例小根堆管理定时器事件小根堆和链表管理定时器事件区别 二、基本数据结构介绍结构体成员分析小根堆和链表common_timeout图示 三、源码分析小根堆管理定时器事件event_newevent_addevent_dispatch 链表common_timeout管理定时器事件event_base_init_common…...

3D资产管理

3D 资产管理是指组织、跟踪、优化和分发 3D 模型和资产以用于游戏、电影、AR/VR 体验等各种应用的过程。 3D资产管理也称为3D内容管理。 随着游戏、电影、建筑、工程等行业中 3D 内容的增长&#xff0c;实施有效的资产管理工作流程对于提高生产力、减少错误、简化工作流程以及使…...

鸿蒙Harmony应用开发—ArkTS声明式开发(基础手势:Blank)

空白填充组件&#xff0c;在容器主轴方向上&#xff0c;空白填充组件具有自动填充容器空余部分的能力。仅当父组件为Row/Column/Flex时生效。 说明&#xff1a; 该组件从API Version 7开始支持。后续版本如有新增内容&#xff0c;则采用上角标单独标记该内容的起始版本。 子组件…...

【手游联运平台搭建】游戏平台的作用

随着科技的不断发展&#xff0c;游戏行业也在不断壮大&#xff0c;而游戏平台作为连接玩家与游戏的桥梁&#xff0c;发挥着越来越重要的作用。游戏平台不仅为玩家提供了便捷的游戏体验&#xff0c;还为游戏开发者提供了广阔的市场和推广渠道。本文将从多个方面探讨游戏平台的作…...

手把手教会你 - StreamAPI基本用法

1. 简介 目前响应式编程的学习中很多时候都用到了Lambda表达式和StreamAPI&#xff0c;那么今天就在这里记录一下一些最基本的使用方法。 StreamAPI中引入了流的概念&#xff0c;其将集合看作一种流&#xff0c;流在管道中传输&#xff08;动态的&#xff09;&#xff0c;可以…...

和为K的子数组

题目&#xff1a; 使用前缀和的方法可以解决这个问题&#xff0c;因为我们需要找到和为k的连续子数组的个数。通过计算前缀和&#xff0c;我们可以将问题转化为求解两个前缀和之差等于k的情况。 假设数组的前缀和数组为prefixSum&#xff0c;其中prefixSum[i]表示从数组起始位…...

Redis:java中redis的基本使用(springboot)

文章目录 springboot中使用redisspringboot 连接 redis三种方式导入依赖增删改查小练习 springboot中使用redis springboot 连接 redis三种方式 jedis &#xff08;redis官方提供的&#xff09;springboot自带的redisson (基于jedis优化的&#xff0c;性能最好&#xff0c;使…...

微型计算机技术

摘要&#xff1a;微型计算机是通用计算机的一个重要发展分支,自1981年美国IBM公司推出第一代商用微型计算机以来,微型计算机迅速进入社会各个领域,且技术不断更新、产品快速换代,已成为人们工作和生活中不可缺少的基本工具。 一、微型计算机技术发展历史 1.第一代微处理器(19…...

mysql下载教程

什么是mysql MySQL是一种开源的关系型数据库管理系统&#xff0c;由瑞典MySQL AB公司开发&#xff0c;现在由Oracle公司维护。MySQL支持多个操作系统&#xff0c;包括Linux、Windows、macOS等。它是一种客户端/服务器模式的数据库&#xff0c;提供高效、可靠、稳定的数据存储和…...

ResponseStatusException

目录 概述&#xff1a; 综合实例&#xff1a; 继承 ResponseStatusException-自定义异常类 继承 ResponseStatusException-自定义响应头信息 继承 ResponseStatusException-定制更多异常处理逻辑 继承 ResponseStatusException-根据异常发生的上下文动态改变 HTTP 状态码…...

第五十二回 戴宗二取公孙胜 李逵独劈罗真人-飞桨AI框架安装和使用示例

吴用说只有公孙胜可以破法术&#xff0c;于是宋江请戴宗和李逵去蓟州。两人听说公孙胜的师傅罗真人在九宫县二仙山讲经&#xff0c;于是到了二仙山&#xff0c;并在山下找到了公孙胜的家。 两人请公孙胜去帮助打高唐州&#xff0c;公孙胜说听师傅的。罗真人说出家人不管闲事&a…...

CSAPP-程序的机器级表示

文章目录 概念扫盲思想理解经典好图安全事件 概念扫盲 1.汇编代码使用文本格式&#xff0c;相较于汇编的二进制可读性更好 2.程序内存包括&#xff1a;可执行的机器代码、操作系统需要的信息、管理过程调用和返回的运行时栈、用户分配的内存块 3.链接器为函数调用找到匹配的可…...

TCP传输收发

TCP通信: TCP发端: socket connect send recv close TCP收端: socket bind listen accept send recv close 1.connect int connect(int sockfd, const struct sockaddr *addr, socklen_t ad…...

OJ习题之——圆括号编码

圆括号编码 1.题目描述2.完整代码3.图例演示 1.题目描述 题目描述 令Ss1 s2 …sn是一个规则的圆括号字符串。S以2种不同形式编码&#xff1a; &#xff08;1&#xff09;用一个整数序列Pp1 p2 … pn编码&#xff0c;pi代表在S中第i个右圆括号的左圆括号数量。&#xff08;记为…...

Android耗电分析之Battery Historian工具使用

Battery-Historian是谷歌推出的一款专门分析Bugreport的工具&#xff0c;是谷歌在2015年I/O大会上推出的一款检测运行在android5.0(Lollipop)及以后版本的设备上电池的相关信息和事件的工具&#xff0c;是一款对于分析手机状态&#xff0c;历史运行情况很好的可视化分析工具。 …...

vue el-avatar 使用require提示无法找到图片

报错信息 错误代码 问题分析 vue初始化DOM树时没有挂载数据,导致无法找到模块 解决方案...

深入理解 C# 中的 Task:异步编程的利器

深入理解 C# 中的 Task&#xff1a;异步编程的利器 前言一、Task 的基本概念什么是 Task&#xff1f;为什么要使用 Task&#xff1f; Task 的使用方法创建 Task等待 Task 完成Task 返回结果 Task 的进阶用法Task 异常处理Task 同步执行Task 并发限制 Task 的实际应用场景并行计…...

YOLOv9电动车头盔佩戴检测,详细讲解模型训练

向AI转型的程序员都关注了这个号&#x1f447;&#x1f447;&#x1f447; 一、YOLOv9简介 YOLOv9是YOLO系列算法的最新版本。YOLO系列算法自2015年首次提出以来&#xff0c;已经在目标检测领域取得了显著的进展&#xff0c;以其快速和准确的特点而广受欢迎。 论文地址&#xf…...

OpenStack之Nova

一 、Nova 使用OpenStack Compute来托管和管理云计算系统。 OpenStack Compute是基础架构即服务 &#xff08;IaaS&#xff09;系统的主要部分。 主要模块在Python中实现&#xff1a; 1因为认证&#xff0c;与OpenStack 身份认证keystone 交互。 2因为磁盘和服务器镜像&#xf…...

虽说主业搞前端,看到如此漂亮的网页UI,也是挪不开眼呀。

漂亮的网页UI能够吸引人的眼球&#xff0c;给人留下深刻的印象。作为前端开发人员&#xff0c;可以通过不断学习和掌握设计技巧和工具&#xff0c;提升自己的UI设计能力&#xff0c;为用户提供更好的视觉体验。 以下是一些提升网页UI设计能力的建议&#xff1a; 学习设计基础知…...