跳转至

autowzry-agent 开发指南

版本: v1.0 日期: 2025-01-12


开发环境准备

系统要求

  • Python 3.8+
  • CUDA 11.0+ (可选,用于 GPU 加速)
  • ADB (Android Debug Bridge)
  • 安卓设备或模拟器(王者荣耀已安装)

依赖安装

# 创建虚拟环境
python -m venv venv
source venv/bin/activate  # Windows: venv\Scripts\activate

# 安装依赖
pip install -r requirements.txt

requirements.txt 内容:

torch>=2.0.0
torchvision>=0.15.0
numpy>=1.21.0
opencv-python>=4.5.0
h5py>=3.7.0
PyYAML>=6.0
tensorboard>=2.10.0  # 可选
pytesseract>=0.3.10  # 可选,用于 OCR

开发流程指南

第一步: 创建项目结构

# 执行以下命令创建目录结构
mkdir -p config core environment data utils scripts
mkdir -p data/episodes models/checkpoints logs templates
touch config/__init__.py core/__init__.py environment/__init__.py
touch data/__init__.py utils/__init__.py scripts/__init__.py

第二步: 开发配置模块

2.1 定义配置类

文件: config/config.py

开发步骤: 1. 定义 HyperParameters dataclass - 列出所有超参数 - 设置合理的默认值

  1. 定义 PathConfig dataclass
  2. 定义所有路径常量

  3. 定义 EnvironmentConfig dataclass

  4. 定义环境相关配置

  5. 实现 Config

  6. from_yaml(): 使用 yaml.safe_load() 读取文件
  7. save_yaml(): 使用 yaml.dump() 保存
  8. from_args(): 使用 argparse 解析命令行
  9. create_argument_parser(): 定义所有参数

2.2 创建默认配置文件

文件: config/default_config.yaml

结构:

hyperparameters:
  input_channels: 4
  learning_rate: 0.0001
  # ... 其他参数

paths:
  data_dir: ./data
  # ... 其他路径

environment:
  mode: spectate
  # ... 其他配置

2.3 测试配置模块

# 测试脚本
from config.config import Config

# 生成默认配置
Config.generate_default_config('config/test_config.yaml')

# 加载配置
config = Config.from_yaml('config/test_config.yaml')
print(config.hyperparams.learning_rate)

# 测试命令行参数
config, args = Config.from_args(['--batch-size', '64'])
print(config.hyperparams.batch_size)  # 应该是 64

第三步: 开发工具模块

3.1 图像预处理

文件: utils/image_processing.py

开发步骤: 1. 实现 resize_frame() ```python import cv2

def resize_frame(frame, width, height): return cv2.resize(frame, (width, height)) ```

  1. 实现 to_grayscale() python def to_grayscale(frame): return cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

  2. 实现 normalize() python def normalize(frame): return frame.astype(np.float32) / 255.0

  3. 实现 stack_frames() python def stack_frames(frames): return np.stack(frames, axis=0)

  4. 实现 preprocess_frame() python def preprocess_frame(frame, width=84, height=84, grayscale=True): frame = resize_frame(frame, width, height) if grayscale: frame = to_grayscale(frame) frame = normalize(frame) return frame

测试:

import numpy as np
from utils.image_processing import preprocess_frame

# 模拟一帧图像
frame = np.random.randint(0, 255, (1920, 1080, 3), dtype=np.uint8)
processed = preprocess_frame(frame)
print(processed.shape)  # 应该是 (84, 84)
print(processed.min(), processed.max())  # 应该在 [0, 1] 范围

3.2 日志工具

文件: utils/logger.py

开发步骤: 1. 实现基础日志功能 ```python import logging import os

class Logger: def init(self, log_dir, use_tensorboard=False): os.makedirs(log_dir, exist_ok=True) self.log_file = os.path.join(log_dir, 'train.log')

       # 配置文件日志
       logging.basicConfig(
           filename=self.log_file,
           level=logging.INFO,
           format='%(asctime)s - %(levelname)s - %(message)s'
       )

       # 配置控制台日志
       console = logging.StreamHandler()
       console.setLevel(logging.INFO)
       logging.getLogger().addHandler(console)

   def log_text(self, message):
       logging.info(message)

```

  1. (可选) 添加 TensorBoard 支持

第四步: 开发兼容模块

文件: environment/compatibility.py

4.1 接口设计

class CompatibilityModule:
    def __init__(self, backend='autowzry-lite', image_width=84, image_height=84):
        self.backend = backend
        self.image_width = image_width
        self.image_height = image_height
        self.device = None
        self.init_device()

    def init_device(self):
        """初始化设备连接"""
        try:
            if self.backend == 'autowzry':
                import autowzry
                self.device = autowzry.Device()
            else:
                import autowzry_lite as autowzry
                self.device = autowzry.Device()
        except Exception as e:
            raise RuntimeError(f"Failed to initialize device: {e}")

    def capture_screen(self):
        """截图并预处理"""
        # 调用 autowzry 截图接口
        frame = self.device.screenshot()

        # 预处理
        from utils.image_processing import preprocess_frame
        processed = preprocess_frame(frame, self.image_width, self.image_height)

        return processed

    def execute_action(self, action):
        """执行动作"""
        # 根据 action dict 调用对应接口
        if action['type'] == 'move':
            self.device.move(angle=action['direction'], duration=0.5)
        elif action['type'] == 'attack':
            self.device.attack()
        # ... 其他动作

    def is_battle_running(self):
        """判断对战是否进行中"""
        return self.device.is_in_battle()

    def enter_battle(self):
        """进入对战"""
        if self.backend == 'autowzry':
            self.device.start_game()
            self.device.enter_battle()

4.2 测试

# 测试连接
compat = CompatibilityModule()
frame = compat.capture_screen()
print(frame.shape)  # 应该是 (84, 84)

# 测试动作执行
action = {'type': 'move', 'direction': 0}
compat.execute_action(action)

第五步: 开发动作空间

文件: environment/action_space.py

5.1 定义动作空间

class ActionSpace:
    def __init__(self, enabled_actions=['move', 'attack']):
        self.enabled_actions = enabled_actions
        self.actions = {}
        self.define_action_space()

    def define_action_space(self):
        """定义动作字典"""
        idx = 0

        # 移动动作 (8方向)
        if 'move' in self.enabled_actions:
            for angle in [0, 45, 90, 135, 180, 225, 270, 315]:
                self.actions[idx] = {'type': 'move', 'direction': angle}
                idx += 1

        # 攻击动作
        if 'attack' in self.enabled_actions:
            self.actions[idx] = {'type': 'attack', 'target': 'auto'}
            idx += 1

        # 技能动作
        if 'skill' in self.enabled_actions:
            for slot in [1, 2, 3]:
                self.actions[idx] = {'type': 'skill', 'slot': slot}
                idx += 1

    def encode_action(self, command):
        """命令 → 索引"""
        for idx, action in self.actions.items():
            if action == command:
                return idx
        return 0  # 默认动作

    def decode_action(self, index):
        """索引 → 命令"""
        return self.actions.get(index, self.actions[0])

    def get_dim(self):
        """返回动作空间维度"""
        return len(self.actions)

5.2 测试

action_space = ActionSpace()
print(f"Action dim: {action_space.get_dim()}")  # 应该是 9 (8移动+1攻击)

# 测试编解码
action = {'type': 'move', 'direction': 90}
idx = action_space.encode_action(action)
decoded = action_space.decode_action(idx)
print(decoded)  # 应该等于 action

第六步: 开发状态评估与奖励

文件: environment/reward_evaluator.py

6.1 初期实现(简化版)

import cv2
import numpy as np

class RewardEvaluator:
    def __init__(self, enabled_detections=['kill', 'death'],
                 enabled_rewards=['kill', 'death']):
        self.enabled_detections = enabled_detections
        self.enabled_rewards = enabled_rewards

        # 奖励字典
        self.reward_dict = {
            'default': 0.01,
            'kill': 5.0,
            'death': -2.0,
            'assist': 1.0,
        }

    def detect_events(self, frame):
        """检测游戏事件"""
        events = {
            'kill_count': 0,
            'death_count': 0,
            'assist_count': 0,
            'is_dead': False,
        }

        if 'death' in self.enabled_detections:
            events['is_dead'] = self._detect_death(frame)

        # 其他检测...

        return events

    def compute_reward(self, prev_state, curr_state):
        """计算奖励"""
        reward = self.reward_dict['default']

        prev_events = self.detect_events(prev_state)
        curr_events = self.detect_events(curr_state)

        # 检测击杀
        if 'kill' in self.enabled_rewards:
            if curr_events['kill_count'] > prev_events['kill_count']:
                reward += self.reward_dict['kill']

        # 检测死亡
        if 'death' in self.enabled_rewards:
            if curr_events['is_dead'] and not prev_events['is_dead']:
                reward += self.reward_dict['death']

        return reward

    def _detect_death(self, frame):
        """检测死亡(简化实现)"""
        # 示例: 检测屏幕中心是否变暗(死亡时通常有灰色滤镜)
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        center = gray[frame.shape[0]//2-50:frame.shape[0]//2+50,
                      frame.shape[1]//2-50:frame.shape[1]//2+50]
        mean_brightness = np.mean(center)
        return mean_brightness < 50  # 阈值需要调整

6.2 后期完善

  • 使用模板匹配检测 UI 元素
  • 使用 OCR 识别数字
  • 添加更多状态检测(血量、位置等)

第七步: 开发数据收集模块

文件: data/experience_collector.py

7.1 实现收集器

import os
import h5py
import time
from datetime import datetime

class ExperienceCollector:
    def __init__(self, compatibility_module, action_space, reward_evaluator,
                 mode='spectate', save_dir='./data/episodes'):
        self.compat = compatibility_module
        self.action_space = action_space
        self.reward_evaluator = reward_evaluator
        self.mode = mode
        self.save_dir = save_dir
        self.model = None
        self.current_episode = []

        os.makedirs(save_dir, exist_ok=True)

    def set_model(self, model):
        self.model = model

    def collect_one_step(self):
        """收集一步经验"""
        # 获取当前状态
        state = self.compat.capture_screen()

        # 根据模式决定动作
        if self.mode == 'spectate':
            # 观战模式: 等待一段时间再截图,推断动作
            time.sleep(0.5)
            next_state = self.compat.capture_screen()

            # 从图像差分推断动作 (简化实现)
            from utils.action_inference import infer_action
            action = infer_action(state, next_state)

        elif self.mode == 'battle':
            # 对战模式: 模型推理
            action = self.model.predict(state)
            action_cmd = self.action_space.decode_action(action)

            # 执行动作
            self.compat.execute_action(action_cmd)
            time.sleep(0.5)

            next_state = self.compat.capture_screen()

        # 计算奖励
        reward = self.reward_evaluator.compute_reward(state, next_state)

        # 判断是否结束
        done = not self.compat.is_battle_running()

        experience = (state, action, reward, next_state, done)
        self.current_episode.append(experience)

        return experience

    def collect_episode(self, max_steps=10000):
        """收集完整一局"""
        self.current_episode = []
        step = 0

        while self.compat.is_battle_running() and step < max_steps:
            self.collect_one_step()
            step += 1

            if step % 100 == 0:
                print(f"Collected {step} steps...")

        print(f"Episode finished with {len(self.current_episode)} steps")
        return self.current_episode

    def save_episode(self, episode=None, filename=None):
        """保存episode到HDF5"""
        if episode is None:
            episode = self.current_episode

        if len(episode) == 0:
            return None

        # 生成文件名
        if filename is None:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            filename = f"episode_{timestamp}_{len(episode)}steps.h5"

        filepath = os.path.join(self.save_dir, filename)

        # 分离数据
        states = np.array([exp[0] for exp in episode])
        actions = np.array([exp[1] for exp in episode])
        rewards = np.array([exp[2] for exp in episode])
        next_states = np.array([exp[3] for exp in episode])
        dones = np.array([exp[4] for exp in episode])

        # 保存
        with h5py.File(filepath, 'w') as f:
            f.create_dataset('states', data=states, compression='gzip')
            f.create_dataset('actions', data=actions)
            f.create_dataset('rewards', data=rewards)
            f.create_dataset('next_states', data=next_states, compression='gzip')
            f.create_dataset('dones', data=dones)

            f.attrs['mode'] = self.mode
            f.attrs['num_steps'] = len(episode)
            f.attrs['total_reward'] = np.sum(rewards)

        print(f"Saved episode to {filepath}")
        self.current_episode = []
        return filepath

7.2 实现动作推断(简化版)

文件: utils/action_inference.py

import numpy as np

def infer_action(prev_frame, curr_frame):
    """
    从图像差分推断动作 (简化实现)
    实际应用中需要更复杂的逻辑
    """
    # 计算差异
    diff = np.abs(curr_frame - prev_frame).mean()

    # 简单规则: 差异大则认为有移动
    if diff > 0.1:
        # 随机选择一个移动方向 (占位符)
        return np.random.randint(0, 8)
    else:
        return 0  # 默认动作

第八步: 开发 Replay Buffer

文件: data/replay_buffer.py

import os
import glob
import h5py
import numpy as np
from collections import deque

class ReplayBuffer:
    def __init__(self, max_size=100000):
        self.buffer = deque(maxlen=max_size)
        self.max_size = max_size

    def push(self, experience):
        self.buffer.append(experience)

    def load_episode_file(self, filepath):
        """加载单个episode文件"""
        with h5py.File(filepath, 'r') as f:
            num_steps = len(f['states'])
            print(f"Loading {num_steps} experiences from {filepath}")

            for i in range(num_steps):
                experience = (
                    f['states'][i],
                    f['actions'][i],
                    f['rewards'][i],
                    f['next_states'][i],
                    f['dones'][i]
                )
                self.push(experience)

    def load_from_directory(self, directory, pattern='episode_*.h5', max_files=None):
        """批量加载episode文件"""
        file_pattern = os.path.join(directory, pattern)
        episode_files = sorted(glob.glob(file_pattern))

        if max_files:
            episode_files = episode_files[:max_files]

        print(f"Found {len(episode_files)} episode files")

        for filepath in episode_files:
            self.load_episode_file(filepath)
            print(f"Buffer size: {len(self.buffer)}/{self.max_size}")

            if len(self.buffer) >= self.max_size:
                print("Buffer full")
                break

    def sample(self, batch_size=32):
        """随机采样"""
        indices = np.random.choice(len(self.buffer), batch_size, replace=False)
        batch = [self.buffer[i] for i in indices]

        states = np.array([exp[0] for exp in batch])
        actions = np.array([exp[1] for exp in batch])
        rewards = np.array([exp[2] for exp in batch])
        next_states = np.array([exp[3] for exp in batch])
        dones = np.array([exp[4] for exp in batch])

        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

    def get_statistics(self):
        """统计信息"""
        if len(self.buffer) == 0:
            return {}

        rewards = [exp[2] for exp in self.buffer]
        return {
            'size': len(self.buffer),
            'avg_reward': np.mean(rewards),
            'max_reward': np.max(rewards),
            'min_reward': np.min(rewards),
        }

第九步: 开发模型和训练器

文件: core/model.py

import torch
import torch.nn as nn
import numpy as np

class BaseModel(nn.Module):
    def forward(self, x):
        raise NotImplementedError

    def save(self, path):
        torch.save(self.state_dict(), path)

    def load(self, path):
        self.load_state_dict(torch.load(path))

class SimpleConvNet(BaseModel):
    def __init__(self, input_channels=4, action_dim=10):
        super().__init__()
        self.action_dim = action_dim

        self.conv_layers = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=8, stride=4),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=4, stride=2),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, stride=1),
            nn.ReLU()
        )

        # 计算卷积输出尺寸
        conv_out_size = self._get_conv_out_size(input_channels)

        self.fc_layers = nn.Sequential(
            nn.Linear(conv_out_size, 512),
            nn.ReLU(),
            nn.Linear(512, action_dim)
        )

    def _get_conv_out_size(self, input_channels):
        dummy_input = torch.zeros(1, input_channels, 84, 84)
        conv_out = self.conv_layers(dummy_input)
        return int(np.prod(conv_out.size()))

    def forward(self, x):
        conv_out = self.conv_layers(x)
        flat = conv_out.view(conv_out.size(0), -1)
        q_values = self.fc_layers(flat)
        return q_values

文件: core/trainer.py

import torch
import torch.nn as nn
import torch.optim as optim

class DQNTrainer:
    def __init__(self, model, target_model, learning_rate=1e-4, gamma=0.99):
        self.model = model
        self.target_model = target_model
        self.gamma = gamma

        self.optimizer = optim.Adam(model.parameters(), lr=learning_rate)
        self.loss_fn = nn.MSELoss()

        self.update_target_network()

    def train_step(self, batch):
        states, actions, rewards, next_states, dones = batch

        # 转为tensor
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.FloatTensor(dones)

        # 当前Q值
        current_q_values = self.model(states).gather(1, actions.unsqueeze(1)).squeeze()

        # 目标Q值
        with torch.no_grad():
            next_q_values = self.target_model(next_states).max(1)[0]
            target_q_values = rewards + self.gamma * next_q_values * (1 - dones)

        # 计算loss
        loss = self.loss_fn(current_q_values, target_q_values)

        # 优化
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

    def update_target_network(self):
        self.target_model.load_state_dict(self.model.state_dict())

第十步: 编写收集和训练脚本

文件: scripts/collect.py

import sys
sys.path.append('..')

from config.config import Config
from data.experience_collector import ExperienceCollector
from environment.compatibility import CompatibilityModule
from environment.action_space import ActionSpace
from environment.reward_evaluator import RewardEvaluator

def main():
    config, args = Config.from_args()

    print("=== Data Collection ===")
    print(f"Mode: {config.environment.mode}")

    # 初始化模块
    compat = CompatibilityModule()
    action_space = ActionSpace()
    reward_eval = RewardEvaluator()

    collector = ExperienceCollector(compat, action_space, reward_eval,
                                   mode=config.environment.mode)

    # 收集数据
    for i in range(config.hyperparams.num_episodes):
        print(f"\nCollecting episode {i+1}...")
        episode = collector.collect_episode()
        collector.save_episode(episode)

if __name__ == '__main__':
    main()

文件: scripts/train.py

import sys
sys.path.append('..')

import torch
from config.config import Config
from data.replay_buffer import ReplayBuffer
from core.model import SimpleConvNet
from core.trainer import DQNTrainer

def main():
    config, args = Config.from_args()

    print("=== Training ===")

    # 加载数据
    buffer = ReplayBuffer(max_size=config.hyperparams.buffer_size)
    buffer.load_from_directory(config.paths.episodes_dir)
    print(f"Loaded {len(buffer)} experiences")

    # 初始化模型
    from environment.action_space import ActionSpace
    action_space = ActionSpace()

    model = SimpleConvNet(input_channels=4, action_dim=action_space.get_dim())
    target_model = SimpleConvNet(input_channels=4, action_dim=action_space.get_dim())

    trainer = DQNTrainer(model, target_model)

    # 训练
    for epoch in range(config.hyperparams.num_epochs):
        batch = buffer.sample(batch_size=config.hyperparams.batch_size)
        loss = trainer.train_step(batch)

        if epoch % 10 == 0:
            print(f"Epoch {epoch}, Loss: {loss:.4f}")
            trainer.update_target_network()

    # 保存模型
    model.save(f"{config.paths.models_dir}/final_model.pth")
    print("Training complete!")

if __name__ == '__main__':
    main()

调试技巧

1. 分模块测试

  • 每个模块完成后立即编写测试代码
  • 确保接口符合预期

2. 数据可视化

  • 保存部分截图查看预处理效果
  • 打印经验样本验证格式

3. 小数据集验证

  • 先用少量数据测试训练流程
  • 验证模型能够过拟合(确认学习能力)

4. 日志详细输出

  • 训练过程打印关键指标
  • 定期保存模型便于恢复

常见问题

Q1: 截图速度太慢

A: 降低截图分辨率,或使用 ADB 的快速截图模式

Q2: 状态识别不准确

A: 先使用简单规则,收集数据后再优化

Q3: 训练不稳定

A: 调整学习率、增加 target 网络更新频率

Q4: 内存不足

A: 减小 buffer 大小,或使用磁盘映射


文档结束