游戏编程模式笔记——第十五章 事件队列

对消息或事件的发送与受理进行事件上的解耦。

动机

对于一个游戏而言维护它自身的事件队列作为其神经系统的主干是很常见的。你会常常听到“中心式”、“全局的”、“主要的”类似这样的描述。它被用于那些希望保持模块低耦合的游戏,起到游戏内部高级通信模块的作用。
假设你的游戏有一个新手教程,该新手教程会在完成指定的游戏事件后弹出帮助框。你的游戏玩法以及战斗相关的代码会很复杂。最后你想做的就是往这些复杂的代码里加入一系列检查以用于触发引导。当然你可以用一个中心事件队列来取而代之。游戏的任何一个系统都可以向它发送事件,于是战斗模块的代码可以在你每次消灭一个敌人后向该队列添加一个“敌人死亡”的事件。
相似的,游戏的任意系统都能从队列中“收取”事件。新手引导模块向事件队列注册自身,并向其声名该模块希望接收“敌人死亡”事件。借此,敌人死亡的消息可以在战斗系统和新手引导模块不进行交互的情况下在两者之间传递。
让我们往游戏中加入音乐,我们从最简易的方法入手来看看它是如何运作的。我们将向游戏中添加一个小的“音效引擎”,它包含根据标识和音量来播放音乐的API:

1
2
3
4
5
class Audio
{
public:
static void playSound(SoundId id, int volume);
};

这个类要做的是,根据SoundID加载对应的声音资源,提供可用的声道并开始讲它播放出来。

1
2
3
4
5
6
7
void Audio::playSound(SoundId id, int volume)
{
ResourceId resource = loadSound(id);
int channel = findOpenChannel();
if (channel == -1) return;
startSound(resource, channel, volume);
}

这样我们在UI代码中,菜单的选中项改变时我们可以播放一个小音效。
在此以后,我们注意到有时切换菜单项时,整个屏幕会卡几帧,这便遇到了我们需要解决的第一个问题。

  • 问题1:在音效引擎完全处理完播放请求前,API的调用一直阻塞着调用者

在AI代码中,我们增加一个调用来让怪物遭受玩家攻击时发出痛哭的哀嚎声。但是英雄攻击多个怪物时,多个声音混合在一起会叠加它们的声波,听起来很刺耳。硬件一次只能播放这么多声音。一旦并发量超过临界值,声音就会被忽略或中断。
为了处理这些问题,我们需要观察整个音效调用集合,并加以汇总和区分。不幸的是,我们的声音API每次仅单独处理一个“playSound()”函数。对整个音效调用集合的处理和穿针引线一样,一次一个。

  • 问题2:不能批量地处理请求

代码库中在许多不同的游戏系统中都涉及“playSound()”函数的调用。但是我们的游戏引擎运行在现代多核硬件上面。为了充分利用多核,我们将它们分配在不同的线程中。
但是代码中我们没有看到任何的线程同步。

  • 问题3:请求在错误的线程被处理

事件队列模式

事件队列是一个按照先进先出顺序存储一系列通知或请求的队列。发出通知时系统会将该请求置入队列并随即返回,请求处理器随后从事件队列中获取并处理这些请求,请求可由处理器直接处理或转交给其感兴趣的模块。这一模式对消息的发送者与受理者进行了解耦,使消息的处理变得动态且非实时。

使用情境

如果你只想对一条消息的发送者和接收者进行解耦,那么诸如观察者模式和命名模式都能以更低的复杂度满足你。需要在某个问题上对时间进行解耦时,一个队列往往足矣。
按照推送和拉取的方式思考:代码A希望另一个代码块B做一些事情。A发起这一请求最自然的方式就是将它推送给B。
同时,B在其自身的循环中适时地拉取该请求并进行处理也是十分自然的。当你具备推送端和拉取端之后,在两者之间需要一个缓冲。这正是缓冲队列比简单的解耦模式多出来的优势。
队列提供给拉取请求的代码块一些控制权:接收者可以延迟处理,聚合请求或者完全废弃它们。但这是通过“剥夺”发送者对队列的控制来实现的。所有的发送端能做的就是往队列里投递消息。这使得队列在发送端需要实时反馈时显得不适用。

使用须知

中心事件队列是个全局变量

该模式的一种普遍用法被称为“中央枢纽站”,游戏中所有模块的消息都可以通过它来传递。它是游戏中强大的基础设施,然而强大并不总意味着好用。
“关于全局变量是糟糕的”这点,当你有一些系统的任何部分都能访问的状态时,各种细小部分不知不觉产生了互相依赖。

游戏世界的状态任你掌控

世界需要不同种类的状态。我们需要死亡的实体,以便了解它有多难杀死。我们可能想要检查周围,看看附近其他的障碍物或爪牙。但如果事件到后来没有被接收到,则这些细节就会消失。实体可能会被释放,附件的其他敌人也会分散。
当你接收到一个事件,你要十分谨慎,不可认为当前世界的状态反应的是消息发出时世界的状态。这就意味着队列事件视图比同步系统的事件具有更重量级的数据结构。后者只需通知“某事发生了”然后接收者可以检查系统环境来深入细节,而使用队列时,这些细节必须在事件发生时被记录以便稍后处理消息时使用。

你会在反馈系统循环中绕圈子

任何一个事件或消息系统都得留意循环。
A发送一个事件。
B接收它,之后发送一个响应事件。
这个响应事件恰巧是A关心的,所以接收它。作为反馈A也会发送一个响应事件……
回到2.
当你的消息系统是同步的时,你很快就能发现死循环——它们会导致栈溢出并造成游戏崩溃。对于队列来说,异步的放开栈处理会使这些伪事件在系统中来回徘徊,但游戏可能会保持运行。一个常用的规避方法则是避免在处理事件端代码中发送事件。

示例代码

回顾之前Audio的代码,我们想推迟这些工作以便“playSound()”可以快速返回。为了实现,我们需要将播放声音的请求具体化。我们需要一些结构来存储待处理的请求,以便在后续保持请求的信息。

1
2
3
4
5
struct PlayMessage
{
SoundId id;
int volume;
};

我们需要给“Audio”类一些存储空间以便它可以追踪这些播放的消息,实践中数组总是存储一系列同结构事物的最佳方法。

  • 无动态分配
  • 没有为记录信息的存储额外产生开销或指针
  • 可缓存的连续存储空间
1
2
3
4
5
6
7
8
9
10
11
class Audio
{
public:
static void init() {numPending_ = 0;}

// Other stuff...
private:
static const int MAX_PENDING = 16;
static PlayMessage pending_[MAX_PENDING];
static int numPending_;
};

调节数值的大小来覆盖我们最坏的情况。为了播放声音,我们简单地在数组末尾放置一个新的位置:

1
2
3
4
5
6
7
8
void Audio::playSound(SoundId id, int volume)
{
assert(numPending_ < MAX_PENDING);

pending_[numPending_].id = id;
pending_[numPending_].volume = volume;
numPending++;
}

这让”playSound()”函数几乎能够即时返回,当然,我们仍然需要播放音乐。这段代码需要在某处运行,即”update()”方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Audio
{
public:
static void update()
{

for(int i = 0; i < numPending_; i++)
{
ResourceId resource = loadSound(
pending_[i].id);
int channel = findOpenChannel();
if (channel == -1) return;
startSound(resource, channel, pending_[i].volume);
}

numPending_ = 0;
}
// Other stuff...
};

现在我们需要在某处适时地调用它,“适时”意味着这取决于你的游戏。它可能在主游戏循环被调用,或者在一个专用的声音线程中被调用。

环状缓冲区

它保有数组所有的优点,同时允许我们从队列的前端持续地移除元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class Audio
{
public:
static void init()
{

head_ = 0;
tail_ = 0;
}

// Method...
private:
static int head_;
static int tail_;
// Array...
};

在”playSound()”函数实现中,”numPending“被替换成”tail“,其他地方是一样的:

1
2
3
4
5
6
7
8
9
void Audio::playSound(SoundId id, int volume)
{
assert(tail_ < MAX_PENDING);

// Add to the end of the list.
pending_[tail_].id = id;
pending_[tail_].volume = volume;
tail_++;
}

更有趣的变化在”update()”函数中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Audio::update()
{
// If there are no pending requests, do nothing.
if (head_ == tail_) return;

ResourceId resource = loadSound(
pending_[head_].id);
int channel = findOpenChannel();
if (channel == -1) return;

startSound(resource, channel,
pending_[head_].volume);
head++;
}

我们会处理队列头部的请求,并通过移动头指针来废弃它。通过检查头尾之间的距离是否为0来检测空队列。
当队尾移动到最后时,我们要做的就是把尾部到绕回到头部。这就是为什么它叫做环状缓冲区——它运转起来像个圆形细胞阵列。

1
2
3
4
5
6
7
8
9
void Audio::playSound(SoundId id, int volume)
{
assert((tail_ + 1) % MAX_PENDING != head_);

// Add to the end of the list.
pending_[tail_].id = id;
pending_[tail_].volume = volume;
tail_ = (tail_ + 1) % MAX_PENDING;
}

assert操作确保了一旦队列被填满了,不会发生头尾相吞并产生覆盖的情况。
在”update()”函数中,我们同样对头部做了绕回的处理:

1
2
3
4
5
6
7
8
9
10
11
12
void Audio::update()
{
// If there are no pending requests, do nothing.
if (head_ == tail_) return;
ResourceId resource = loadSound( pending_[head_].id);

int channel = findOpenChannel();
if (channel == -1) return;
startSound(resource, channel, pending_[head_].volume);

head_ = (head + 1) % MAX_PENDING;
}

如果最大容量会有问题,你可以使用可增长的数组。当队列满了以后,分配一个新的数组,大小是当前数组的两倍,并把原数组的项拷贝过去。

汇总请求

多个播放相同音乐的请求会导致声音过大。由于我们能够获知当前正在等候处理的是哪个请求,所以需要做的就是将与当前等待处理的请求相符(播放同一个音乐)的请求进行合并:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void Audio::playSound(SoundId, int volume)
{
// Walk the pending requests.
for(int i = head_; i != tail_; i = (i + 1) % MAX_PENDING)
{
if (pending_[i].id == id)
{
// Use the larger of the two volumes.
pending_[i].volume = max(volume, pending_[i].volume);

// Don't need to enqueue.
return;
}
}

// Previous code...
}

“汇总”是相当初步的,但我们可以用同样的想法批量处理更多有趣的事情。
这里有一些重要的事情必须记住。我们可汇总的“同步发生”的请求数量只和队列一般大小。如果我们更快地处理请求,队列尺寸保持很小,那么可以批量处理请求的机会就较小。同样,如果处理请求滞后,队列被填满,我们将会发现更多的崩溃。
这种模式将请求方与请求被处理的时间进行隔离,但是当你把整个队列作为一个动态的数据结构去操作时,请求提出和处理请求之间的滞后会显著地影响系统表现。所以,确认这么做之前你已准备好了。

跨越线程

由于我们有三点严格的要求,所以在线程上做文章并不难。

  • 请求声音的代码和播放器声音已解耦。
  • 两者之间有一个队列来封送处理。
  • 队列从程序的其余部分中被单独封装出来。

我们所需要做的是保证队列不被同步修改。”playSound()”函数做的工作量非常小——基本上就是分配一些字段的空间——因此可以在很短的时间内阻塞处理进程的同时锁住它。在”update()”函数中,我们等待某个条件变量以免消耗CPU周期,直到有请求需要处理。

设计决策

入队的是什么

“事件”和“消息”总是被我替换着使用,因为这无伤大雅。无论你往队列里塞什么,它都具备相同的解耦与聚合能力,但二者仍然有一些概念上的不同。
如果队列中是事件
一个“事件”或“通知”描述已经发生的事情,比如“怪物死亡”。你将它入队,所以其他对象可以响应事件,有几分像一个异步的观察者模式。

  • 你可能会允许多个监听器。由于队列包含的事件已经发生,因此发送者不关心谁会接收到它。从这个角度来看,这个事件已经过去并且已经被忘记了。
  • 可访问队列的域往往更广。事件队列经常用于给任何和所有感兴趣的部分广播事件。为了允许感兴趣的部分有最大的灵活性,这些队列往往有更多的全局可见性。

如果队列中是消息
一个“消息”或“请求”描述一种“我们期望”发生在“将来”的行为,类似于“播放音乐”。你可以认为这一个异步API服务。

  • 你更可能只有单一的监听器。就像示例中队列的消息专门用于请求播放声音,如果游戏的其他任何开始从队列中偷窃消息,那并不会起到好的作用。

谁能从队列中读取

在用户界面接口的事件系统中,你可以随心地注册监听器。你有时会耳闻术语“单播(single-cast)”和“广播(broadcast)”以进行区别,这两者都很有用。
单播队列
当一个队列是一个类的API本身的一部分时,单播就再合适不过了。类似我们的声音示例,站在调用者角度,他们能调用的只是一个”playSound()”方法。

  • 队列成为读取者的实现细节。所有的发送者知道的只是它发送了一条消息。
  • 队列被更多地封装。所有其他条件相同的情况下,更多的封装通常是更好的。
  • 你不必担心多个监听器竞争的情况。在多个监听者的情况下,你不得不决定他们是否都获取队列中的每一项(广播)或是否队列中的每一项都只打包分配给一个监听器(更像一个工作队列)。

监听器可能会做重复的工作或者互相干扰,对于一个单一的监听器,这种复杂性会消失。

广播队列
这是大多数“事件”系统所做的事情。当一个事件进来时,如果你有是个监听器,则它们都能看见该事件。

  • 事件可以被删除。在大多数的广播系统中,如果某一时刻处理事件没有监听器,那么事件就会被废弃。
  • 可能需要过滤事件。广播队列通常是在系统内大范围可见的,而且最终你会有大量的监听器。大量事件乘以大量监听器,于是你将调用大量的事件句柄。为了缩减规模,大部分广播事件系统会让一个监听器过滤它们收到的事件集合。

工作队列
类似于一个广播队列,此时你也有多个监听器。不同的是队列中的每一项只会被投递到一个监听器中。这是一种对于并发线程支持不好的系统常见的工作分配模式。

  • 你必须做好规划。因为一个项目只投递给一个监听器,队列逻辑需要找出最好的选择。这可能是简单循环或随机循环,或者是一些更复杂的优先级系统。

谁可以写入队列

该模式适用于所有可能的读/写配置:一对一,一对多,多对一,多对多。
一个写入者

  • 你隐式地知道事件的来源。因为只有一个对象可以向队列添加事件,任何监听器可以安全地假设事件来自该发送者。
  • 通常允许多个读取者。

多个写入者

  • 你必须小心反馈循环。因为任何东西都可能放到队列中,处理事情期间很可能突然入列一些东西。如果你不小心,可能会触发反馈循环。
  • 你可能会想要一些发送方在事件本身的引用。 如果你将发送方的引用打包进事件对象,监听器就可以知道是谁发送的。

队列中对象的生命周期是什么

对于一个队列,消息生存于入列调用之外。
转移所有权
这是手动管理内存时的一种传统方法。当一个消息排队时,队列声明它,发送者不再拥有它。当消息处理时,接收者取走所有权并负责释放它。
共享所有权
当前cpp程序员能更舒服地进行垃圾回收了,但分享所有权会更容易接受。这样只要任何事情对它有一个引用,消息就会依然存在。当被忘记时它就会自动释放(参考shared_ptr)。
队列拥有它
不用自己释放消息,发送者会从队列中请求一个新的消息。队列返回一个已经存在于队列内存的消息引用,接着发送者会填充队列。消息处理时,接收者参考队列中相同消息的操作。