autowzry-agent 开发指南¶
版本: v1.0 日期: 2025-01-12
开发环境准备¶
系统要求¶
- Python 3.8+
- CUDA 11.0+ (可选,用于 GPU 加速)
- ADB (Android Debug Bridge)
- 安卓设备或模拟器(王者荣耀已安装)
依赖安装¶
# 创建虚拟环境
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
# 安装依赖
pip install -r requirements.txt
requirements.txt 内容:
torch>=2.0.0
torchvision>=0.15.0
numpy>=1.21.0
opencv-python>=4.5.0
h5py>=3.7.0
PyYAML>=6.0
tensorboard>=2.10.0 # 可选
pytesseract>=0.3.10 # 可选,用于 OCR
开发流程指南¶
第一步: 创建项目结构¶
# 执行以下命令创建目录结构
mkdir -p config core environment data utils scripts
mkdir -p data/episodes models/checkpoints logs templates
touch config/__init__.py core/__init__.py environment/__init__.py
touch data/__init__.py utils/__init__.py scripts/__init__.py
第二步: 开发配置模块¶
2.1 定义配置类¶
文件: config/config.py
开发步骤:
1. 定义 HyperParameters dataclass
- 列出所有超参数
- 设置合理的默认值
- 定义
PathConfigdataclass -
定义所有路径常量
-
定义
EnvironmentConfigdataclass -
定义环境相关配置
-
实现
Config类 from_yaml(): 使用yaml.safe_load()读取文件save_yaml(): 使用yaml.dump()保存from_args(): 使用argparse解析命令行create_argument_parser(): 定义所有参数
2.2 创建默认配置文件¶
文件: config/default_config.yaml
结构:
hyperparameters:
input_channels: 4
learning_rate: 0.0001
# ... 其他参数
paths:
data_dir: ./data
# ... 其他路径
environment:
mode: spectate
# ... 其他配置
2.3 测试配置模块¶
# 测试脚本
from config.config import Config
# 生成默认配置
Config.generate_default_config('config/test_config.yaml')
# 加载配置
config = Config.from_yaml('config/test_config.yaml')
print(config.hyperparams.learning_rate)
# 测试命令行参数
config, args = Config.from_args(['--batch-size', '64'])
print(config.hyperparams.batch_size) # 应该是 64
第三步: 开发工具模块¶
3.1 图像预处理¶
文件: utils/image_processing.py
开发步骤:
1. 实现 resize_frame()
```python
import cv2
def resize_frame(frame, width, height): return cv2.resize(frame, (width, height)) ```
-
实现
to_grayscale()python def to_grayscale(frame): return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) -
实现
normalize()python def normalize(frame): return frame.astype(np.float32) / 255.0 -
实现
stack_frames()python def stack_frames(frames): return np.stack(frames, axis=0) -
实现
preprocess_frame()python def preprocess_frame(frame, width=84, height=84, grayscale=True): frame = resize_frame(frame, width, height) if grayscale: frame = to_grayscale(frame) frame = normalize(frame) return frame
测试:
import numpy as np
from utils.image_processing import preprocess_frame
# 模拟一帧图像
frame = np.random.randint(0, 255, (1920, 1080, 3), dtype=np.uint8)
processed = preprocess_frame(frame)
print(processed.shape) # 应该是 (84, 84)
print(processed.min(), processed.max()) # 应该在 [0, 1] 范围
3.2 日志工具¶
文件: utils/logger.py
开发步骤: 1. 实现基础日志功能 ```python import logging import os
class Logger: def init(self, log_dir, use_tensorboard=False): os.makedirs(log_dir, exist_ok=True) self.log_file = os.path.join(log_dir, 'train.log')
# 配置文件日志
logging.basicConfig(
filename=self.log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
# 配置控制台日志
console = logging.StreamHandler()
console.setLevel(logging.INFO)
logging.getLogger().addHandler(console)
def log_text(self, message):
logging.info(message)
```
- (可选) 添加 TensorBoard 支持
第四步: 开发兼容模块¶
文件: environment/compatibility.py
4.1 接口设计¶
class CompatibilityModule:
def __init__(self, backend='autowzry-lite', image_width=84, image_height=84):
self.backend = backend
self.image_width = image_width
self.image_height = image_height
self.device = None
self.init_device()
def init_device(self):
"""初始化设备连接"""
try:
if self.backend == 'autowzry':
import autowzry
self.device = autowzry.Device()
else:
import autowzry_lite as autowzry
self.device = autowzry.Device()
except Exception as e:
raise RuntimeError(f"Failed to initialize device: {e}")
def capture_screen(self):
"""截图并预处理"""
# 调用 autowzry 截图接口
frame = self.device.screenshot()
# 预处理
from utils.image_processing import preprocess_frame
processed = preprocess_frame(frame, self.image_width, self.image_height)
return processed
def execute_action(self, action):
"""执行动作"""
# 根据 action dict 调用对应接口
if action['type'] == 'move':
self.device.move(angle=action['direction'], duration=0.5)
elif action['type'] == 'attack':
self.device.attack()
# ... 其他动作
def is_battle_running(self):
"""判断对战是否进行中"""
return self.device.is_in_battle()
def enter_battle(self):
"""进入对战"""
if self.backend == 'autowzry':
self.device.start_game()
self.device.enter_battle()
4.2 测试¶
# 测试连接
compat = CompatibilityModule()
frame = compat.capture_screen()
print(frame.shape) # 应该是 (84, 84)
# 测试动作执行
action = {'type': 'move', 'direction': 0}
compat.execute_action(action)
第五步: 开发动作空间¶
文件: environment/action_space.py
5.1 定义动作空间¶
class ActionSpace:
def __init__(self, enabled_actions=['move', 'attack']):
self.enabled_actions = enabled_actions
self.actions = {}
self.define_action_space()
def define_action_space(self):
"""定义动作字典"""
idx = 0
# 移动动作 (8方向)
if 'move' in self.enabled_actions:
for angle in [0, 45, 90, 135, 180, 225, 270, 315]:
self.actions[idx] = {'type': 'move', 'direction': angle}
idx += 1
# 攻击动作
if 'attack' in self.enabled_actions:
self.actions[idx] = {'type': 'attack', 'target': 'auto'}
idx += 1
# 技能动作
if 'skill' in self.enabled_actions:
for slot in [1, 2, 3]:
self.actions[idx] = {'type': 'skill', 'slot': slot}
idx += 1
def encode_action(self, command):
"""命令 → 索引"""
for idx, action in self.actions.items():
if action == command:
return idx
return 0 # 默认动作
def decode_action(self, index):
"""索引 → 命令"""
return self.actions.get(index, self.actions[0])
def get_dim(self):
"""返回动作空间维度"""
return len(self.actions)
5.2 测试¶
action_space = ActionSpace()
print(f"Action dim: {action_space.get_dim()}") # 应该是 9 (8移动+1攻击)
# 测试编解码
action = {'type': 'move', 'direction': 90}
idx = action_space.encode_action(action)
decoded = action_space.decode_action(idx)
print(decoded) # 应该等于 action
第六步: 开发状态评估与奖励¶
文件: environment/reward_evaluator.py
6.1 初期实现(简化版)¶
import cv2
import numpy as np
class RewardEvaluator:
def __init__(self, enabled_detections=['kill', 'death'],
enabled_rewards=['kill', 'death']):
self.enabled_detections = enabled_detections
self.enabled_rewards = enabled_rewards
# 奖励字典
self.reward_dict = {
'default': 0.01,
'kill': 5.0,
'death': -2.0,
'assist': 1.0,
}
def detect_events(self, frame):
"""检测游戏事件"""
events = {
'kill_count': 0,
'death_count': 0,
'assist_count': 0,
'is_dead': False,
}
if 'death' in self.enabled_detections:
events['is_dead'] = self._detect_death(frame)
# 其他检测...
return events
def compute_reward(self, prev_state, curr_state):
"""计算奖励"""
reward = self.reward_dict['default']
prev_events = self.detect_events(prev_state)
curr_events = self.detect_events(curr_state)
# 检测击杀
if 'kill' in self.enabled_rewards:
if curr_events['kill_count'] > prev_events['kill_count']:
reward += self.reward_dict['kill']
# 检测死亡
if 'death' in self.enabled_rewards:
if curr_events['is_dead'] and not prev_events['is_dead']:
reward += self.reward_dict['death']
return reward
def _detect_death(self, frame):
"""检测死亡(简化实现)"""
# 示例: 检测屏幕中心是否变暗(死亡时通常有灰色滤镜)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
center = gray[frame.shape[0]//2-50:frame.shape[0]//2+50,
frame.shape[1]//2-50:frame.shape[1]//2+50]
mean_brightness = np.mean(center)
return mean_brightness < 50 # 阈值需要调整
6.2 后期完善¶
- 使用模板匹配检测 UI 元素
- 使用 OCR 识别数字
- 添加更多状态检测(血量、位置等)
第七步: 开发数据收集模块¶
文件: data/experience_collector.py
7.1 实现收集器¶
import os
import h5py
import time
from datetime import datetime
class ExperienceCollector:
def __init__(self, compatibility_module, action_space, reward_evaluator,
mode='spectate', save_dir='./data/episodes'):
self.compat = compatibility_module
self.action_space = action_space
self.reward_evaluator = reward_evaluator
self.mode = mode
self.save_dir = save_dir
self.model = None
self.current_episode = []
os.makedirs(save_dir, exist_ok=True)
def set_model(self, model):
self.model = model
def collect_one_step(self):
"""收集一步经验"""
# 获取当前状态
state = self.compat.capture_screen()
# 根据模式决定动作
if self.mode == 'spectate':
# 观战模式: 等待一段时间再截图,推断动作
time.sleep(0.5)
next_state = self.compat.capture_screen()
# 从图像差分推断动作 (简化实现)
from utils.action_inference import infer_action
action = infer_action(state, next_state)
elif self.mode == 'battle':
# 对战模式: 模型推理
action = self.model.predict(state)
action_cmd = self.action_space.decode_action(action)
# 执行动作
self.compat.execute_action(action_cmd)
time.sleep(0.5)
next_state = self.compat.capture_screen()
# 计算奖励
reward = self.reward_evaluator.compute_reward(state, next_state)
# 判断是否结束
done = not self.compat.is_battle_running()
experience = (state, action, reward, next_state, done)
self.current_episode.append(experience)
return experience
def collect_episode(self, max_steps=10000):
"""收集完整一局"""
self.current_episode = []
step = 0
while self.compat.is_battle_running() and step < max_steps:
self.collect_one_step()
step += 1
if step % 100 == 0:
print(f"Collected {step} steps...")
print(f"Episode finished with {len(self.current_episode)} steps")
return self.current_episode
def save_episode(self, episode=None, filename=None):
"""保存episode到HDF5"""
if episode is None:
episode = self.current_episode
if len(episode) == 0:
return None
# 生成文件名
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"episode_{timestamp}_{len(episode)}steps.h5"
filepath = os.path.join(self.save_dir, filename)
# 分离数据
states = np.array([exp[0] for exp in episode])
actions = np.array([exp[1] for exp in episode])
rewards = np.array([exp[2] for exp in episode])
next_states = np.array([exp[3] for exp in episode])
dones = np.array([exp[4] for exp in episode])
# 保存
with h5py.File(filepath, 'w') as f:
f.create_dataset('states', data=states, compression='gzip')
f.create_dataset('actions', data=actions)
f.create_dataset('rewards', data=rewards)
f.create_dataset('next_states', data=next_states, compression='gzip')
f.create_dataset('dones', data=dones)
f.attrs['mode'] = self.mode
f.attrs['num_steps'] = len(episode)
f.attrs['total_reward'] = np.sum(rewards)
print(f"Saved episode to {filepath}")
self.current_episode = []
return filepath
7.2 实现动作推断(简化版)¶
文件: utils/action_inference.py
import numpy as np
def infer_action(prev_frame, curr_frame):
"""
从图像差分推断动作 (简化实现)
实际应用中需要更复杂的逻辑
"""
# 计算差异
diff = np.abs(curr_frame - prev_frame).mean()
# 简单规则: 差异大则认为有移动
if diff > 0.1:
# 随机选择一个移动方向 (占位符)
return np.random.randint(0, 8)
else:
return 0 # 默认动作
第八步: 开发 Replay Buffer¶
文件: data/replay_buffer.py
import os
import glob
import h5py
import numpy as np
from collections import deque
class ReplayBuffer:
def __init__(self, max_size=100000):
self.buffer = deque(maxlen=max_size)
self.max_size = max_size
def push(self, experience):
self.buffer.append(experience)
def load_episode_file(self, filepath):
"""加载单个episode文件"""
with h5py.File(filepath, 'r') as f:
num_steps = len(f['states'])
print(f"Loading {num_steps} experiences from {filepath}")
for i in range(num_steps):
experience = (
f['states'][i],
f['actions'][i],
f['rewards'][i],
f['next_states'][i],
f['dones'][i]
)
self.push(experience)
def load_from_directory(self, directory, pattern='episode_*.h5', max_files=None):
"""批量加载episode文件"""
file_pattern = os.path.join(directory, pattern)
episode_files = sorted(glob.glob(file_pattern))
if max_files:
episode_files = episode_files[:max_files]
print(f"Found {len(episode_files)} episode files")
for filepath in episode_files:
self.load_episode_file(filepath)
print(f"Buffer size: {len(self.buffer)}/{self.max_size}")
if len(self.buffer) >= self.max_size:
print("Buffer full")
break
def sample(self, batch_size=32):
"""随机采样"""
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
batch = [self.buffer[i] for i in indices]
states = np.array([exp[0] for exp in batch])
actions = np.array([exp[1] for exp in batch])
rewards = np.array([exp[2] for exp in batch])
next_states = np.array([exp[3] for exp in batch])
dones = np.array([exp[4] for exp in batch])
return states, actions, rewards, next_states, dones
def __len__(self):
return len(self.buffer)
def get_statistics(self):
"""统计信息"""
if len(self.buffer) == 0:
return {}
rewards = [exp[2] for exp in self.buffer]
return {
'size': len(self.buffer),
'avg_reward': np.mean(rewards),
'max_reward': np.max(rewards),
'min_reward': np.min(rewards),
}
第九步: 开发模型和训练器¶
文件: core/model.py
import torch
import torch.nn as nn
import numpy as np
class BaseModel(nn.Module):
def forward(self, x):
raise NotImplementedError
def save(self, path):
torch.save(self.state_dict(), path)
def load(self, path):
self.load_state_dict(torch.load(path))
class SimpleConvNet(BaseModel):
def __init__(self, input_channels=4, action_dim=10):
super().__init__()
self.action_dim = action_dim
self.conv_layers = nn.Sequential(
nn.Conv2d(input_channels, 32, kernel_size=8, stride=4),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=4, stride=2),
nn.ReLU(),
nn.Conv2d(64, 64, kernel_size=3, stride=1),
nn.ReLU()
)
# 计算卷积输出尺寸
conv_out_size = self._get_conv_out_size(input_channels)
self.fc_layers = nn.Sequential(
nn.Linear(conv_out_size, 512),
nn.ReLU(),
nn.Linear(512, action_dim)
)
def _get_conv_out_size(self, input_channels):
dummy_input = torch.zeros(1, input_channels, 84, 84)
conv_out = self.conv_layers(dummy_input)
return int(np.prod(conv_out.size()))
def forward(self, x):
conv_out = self.conv_layers(x)
flat = conv_out.view(conv_out.size(0), -1)
q_values = self.fc_layers(flat)
return q_values
文件: core/trainer.py
import torch
import torch.nn as nn
import torch.optim as optim
class DQNTrainer:
def __init__(self, model, target_model, learning_rate=1e-4, gamma=0.99):
self.model = model
self.target_model = target_model
self.gamma = gamma
self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
self.loss_fn = nn.MSELoss()
self.update_target_network()
def train_step(self, batch):
states, actions, rewards, next_states, dones = batch
# 转为tensor
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.FloatTensor(dones)
# 当前Q值
current_q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze()
# 目标Q值
with torch.no_grad():
next_q_values = self.target_model(next_states).max(1)[0]
target_q_values = rewards + self.gamma * next_q_values * (1 - dones)
# 计算loss
loss = self.loss_fn(current_q_values, target_q_values)
# 优化
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
def update_target_network(self):
self.target_model.load_state_dict(self.model.state_dict())
第十步: 编写收集和训练脚本¶
文件: scripts/collect.py
import sys
sys.path.append('..')
from config.config import Config
from data.experience_collector import ExperienceCollector
from environment.compatibility import CompatibilityModule
from environment.action_space import ActionSpace
from environment.reward_evaluator import RewardEvaluator
def main():
config, args = Config.from_args()
print("=== Data Collection ===")
print(f"Mode: {config.environment.mode}")
# 初始化模块
compat = CompatibilityModule()
action_space = ActionSpace()
reward_eval = RewardEvaluator()
collector = ExperienceCollector(compat, action_space, reward_eval,
mode=config.environment.mode)
# 收集数据
for i in range(config.hyperparams.num_episodes):
print(f"\nCollecting episode {i+1}...")
episode = collector.collect_episode()
collector.save_episode(episode)
if __name__ == '__main__':
main()
文件: scripts/train.py
import sys
sys.path.append('..')
import torch
from config.config import Config
from data.replay_buffer import ReplayBuffer
from core.model import SimpleConvNet
from core.trainer import DQNTrainer
def main():
config, args = Config.from_args()
print("=== Training ===")
# 加载数据
buffer = ReplayBuffer(max_size=config.hyperparams.buffer_size)
buffer.load_from_directory(config.paths.episodes_dir)
print(f"Loaded {len(buffer)} experiences")
# 初始化模型
from environment.action_space import ActionSpace
action_space = ActionSpace()
model = SimpleConvNet(input_channels=4, action_dim=action_space.get_dim())
target_model = SimpleConvNet(input_channels=4, action_dim=action_space.get_dim())
trainer = DQNTrainer(model, target_model)
# 训练
for epoch in range(config.hyperparams.num_epochs):
batch = buffer.sample(batch_size=config.hyperparams.batch_size)
loss = trainer.train_step(batch)
if epoch % 10 == 0:
print(f"Epoch {epoch}, Loss: {loss:.4f}")
trainer.update_target_network()
# 保存模型
model.save(f"{config.paths.models_dir}/final_model.pth")
print("Training complete!")
if __name__ == '__main__':
main()
调试技巧¶
1. 分模块测试¶
- 每个模块完成后立即编写测试代码
- 确保接口符合预期
2. 数据可视化¶
- 保存部分截图查看预处理效果
- 打印经验样本验证格式
3. 小数据集验证¶
- 先用少量数据测试训练流程
- 验证模型能够过拟合(确认学习能力)
4. 日志详细输出¶
- 训练过程打印关键指标
- 定期保存模型便于恢复
常见问题¶
Q1: 截图速度太慢¶
A: 降低截图分辨率,或使用 ADB 的快速截图模式
Q2: 状态识别不准确¶
A: 先使用简单规则,收集数据后再优化
Q3: 训练不稳定¶
A: 调整学习率、增加 target 网络更新频率
Q4: 内存不足¶
A: 减小 buffer 大小,或使用磁盘映射
文档结束