
第24章 Spring Cloud开发
Spring是JavaEE的一个轻量级开发框架,主营IoC和AOP,集成JDBC、ORM、MVC等功能便于开发。
Spring Boot是基于Spring,提供开箱即用的积木式组件,目的是提升开发效率。
那么Spring Cloud是啥?
Spring Cloud顾名思义是跟云相关的,云程序实际上就是指分布式应用程序,所以Spring Cloud就是为了让分布式应用程序编写更方便,更容易而提供的一组基础设施,它的核心是Spring框架,利用Spring Boot的自动配置,力图实现最简化的分布式应用程序开发。
Spring Cloud包含了一大堆技术组件,既有开源社区开发的组件,也有商业公司开发的组件,既有持续更新迭代的组件,也有即将退役不再维护的组件。
本章会介绍如何基于Spring Cloud创建分布式应用程序,但并不会面面俱到地介绍所有组件,而是挑选几个核心组件,演示如何构造一个基本的分布式应用程序。
24.1 项目架构设计
我们的目标是以Spring Cloud为基础,从零开始搭建一个7x24小时运行的证券交易所。
除了Spring Cloud外,通常项目还需要依赖数据库、消息系统、缓存等各种组件。我们选择组件的原则是通用性高,使用广泛,因此,数据库选择MySQL 8.x,消息系统选择Kafka 3.x,缓存系统选择Redis 6.x。
由于我们的项目是一个7x24小时运行的证券交易系统,因此,我们简单分析一下业务系统的特点:
- 证券交易系统的交易是基于交易对,例如,BTC/USD交易对表示用USD购买BTC,USD是计价货币(Quote Asset),BTC是交易资产(Base Asset);
- 证券交易系统通过买卖双方各自的报价,按照价格优先、时间优先的顺序,对买卖双方进行撮合,实现每秒成千上万的交易量,可以为市场提供高度的流动性和基于微观的价格发现机制。
为了简化设计,我们把项目需求限定如下:
- 仅支持BTC/USD一个交易对;
- 不收取手续费,简化了收费逻辑;
- 暂不考虑与银行和区块链系统对接,简化了资产的存取;
- 暂不考虑风控相关的需求,以便专注于核心业务系统的开发;
- 仅提供Web操作界面,暂不提供手机App;
- 暂无后台管理功能。
项目名称暂定为Warp Exchange,采用GPL v3授权协议。项目最终完成后,效果如下:
系统模块
对一个系统来说,建立一个简单可靠的模型,不但能大大简化系统的设计,而且能以较少的代码实现一个稳定运行的系统,最大限度地减少各种难以预测的错误。
我们来看证券交易系统的业务模型。
对于证券交易系统来说,其输入是所有交易员发送的买卖订单。系统接收到订单后,内部经过定序,再由撮合引擎进行买卖撮合,最后对成交的订单进行清算,买卖双方交换Base和Quote资产,即完成了交易。
在撮合成交的过程中,系统还需要根据成交价格、成交数量以及成交时间,对成交数据进行聚合,以便交易员能直观地以K线图的方式看到历史交易数据,因此,行情系统也是证券交易系统的一部分。此外,推送系统负责将行情、订单成交等事件推送给客户端。
最后,证券交易系统还需要给交易员提供一个操作界面,通常是Web或手机App。UI系统将在内部调用API,因此,API才是整个系统下单和撤单的唯一入口。
整个系统从逻辑上可以划分为如下模块:
- API模块(Trading API),交易员下单、撤单的API入口;
- 定序模块(Sequencer),用于对所有收到的订单进行定序;
- 交易引擎(Trading Engine),对定序后的订单进行撮合、清算;
- 行情模块(Quotation),将撮合输出的成交信息汇总,形成K线图;
- 推送模块(Push),将市场行情、交易结果、资产变化等信息以WebSocket等途径推送给用户;
- UI模块(UI),给交易员提供一个Web操作界面,并把交易员的操作转发给后端API。
以上各模块关系如下:
query ┌───────────────────────────┐ │ │ │ ▼┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐│ Client │──▶│ API │──▶│Sequencer│──▶│ Engine │└─────────┘ └─────────┘ └─────────┘ └─────────┘ ▲ │ │ │┌─────────┐ ┌─────────┐ ││ Browser │──▶│ UI │ │└─────────┘ └─────────┘ │ ▲ ▼ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ └────────│WebSocket│◀──│ Push │◀──│Quotation│ └─────────┘ └─────────┘ └─────────┘
其中,交易引擎作为最核心的模块,我们需要仔细考虑如何设计一个简单可靠,且模块化程度较高的子系统。对证券交易系统来说,交易引擎内部可划分为:
- 资产模块:管理用户的资产;
- 订单模块:管理用户的活动订单(即尚未完全成交且未取消的订单);
- 撮合引擎:处理买卖订单,生成成交信息;
- 清算模块:对撮合引擎输出的成交信息进行清算,使买卖双方的资产进行交换。
交易引擎是一个以事件驱动为核心的系统,它的输入是定序后的一个个事件,输出则是撮合结果、市场行情等数据。交易引擎内部各模块关系如下:
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌─────────┐ ┌─────────┐──┼─▶│ Order │───▶│ Match │ │ └─────────┘ └─────────┘ │ │ │ │ │ │ │ ▼ ▼ │ ┌─────────┐ ┌─────────┐ │ │ Asset │◀───│Clearing │ │ └─────────┘ └─────────┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
经过这样的模块化设计,一个证券交易系统就具备了基本的雏型。
24.2 搭建项目框架
对于Warp Exchange项目,我们以Maven为构建工具,把每个模块作为一个Maven的项目管理,并抽取出公共逻辑放入common
模块,结构如下:
- common:公共代码;
- config:配置服务器;
- push:推送服务;
- quotation:行情服务;
- trading-api:交易API服务;
- trading-engine:交易引擎;
- trading-sequencer:定序服务;
- ui:用户Web界面。
为了简化版本和依赖管理,我们用parent
模块管理最基础的pom.xml
,其他模块直接从parent
继承,能大大简化各自的pom.xml
。parent
模块pom.xml
内容如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itranswarp.exchange</groupId> <artifactId>parent</artifactId> <version>1.0</version> <packaging>pom</packaging>
<!-- 继承自SpringBoot Starter Parent --> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <!-- SpringBoot版本 --> <version>3.0.0</version> </parent>
<properties> <!-- 项目版本 --> <project.version>1.0</project.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!-- Java编译和运行版本 --> <maven.compiler.source>17</maven.compiler.source> <maven.compiler.target>17</maven.compiler.target> <java.version>17</java.version>
<!-- 定义第三方组件的版本 --> <pebble.version>3.2.0</pebble.version> <springcloud.version>2022.0.0</springcloud.version> <springdoc.version>2.0.0</springdoc.version> <vertx.version>4.3.1</vertx.version> </properties>
<!-- 引入SpringCloud依赖 --> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${springcloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<!-- 共享的依赖管理 --> <dependencies> <!-- 依赖JUnit5 --> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-api</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-params</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter-engine</artifactId> <scope>test</scope> </dependency> <!-- 依赖SpringTest --> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <pluginManagement> <plugins> <!-- 引入创建可执行Jar的插件 --> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </pluginManagement> </build></project>
上述pom.xml
中,除了写死的Spring Boot版本、Java运行版本、项目版本外,其他引入的版本均以<xxx.version>1.23</xxx.version>
的形式定义,以便后续可以用${xxx.version}
引用版本号,避免了同一个组件出现多个写死的版本定义。
对其他业务模块,引入parent
的pom.xml
可大大简化配置。以ui
模块为例,其pom.xml
如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<!-- 指定Parent --> <parent> <groupId>com.itranswarp.exchange</groupId> <artifactId>parent</artifactId> <version>1.0</version> <!-- Parent POM的相对路径 --> <relativePath>../parent/pom.xml</relativePath> </parent>
<!-- 当前模块名称 --> <artifactId>ui</artifactId>
<dependencies> <!-- 依赖SpringCloud Config客户端 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> </dependency>
<!-- 依赖SpringBoot Actuator --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>
<!-- 依赖Common模块 --> <dependency> <groupId>com.itranswarp.exchange</groupId> <artifactId>common</artifactId> <version>${project.version}</version> </dependency>
<!-- 依赖第三方模块 --> <dependency> <groupId>io.pebbletemplates</groupId> <artifactId>pebble-spring-boot-starter</artifactId> <version>${pebble.version}</version> </dependency> </dependencies>
<build> <!-- 指定输出文件名 --> <finalName>${project.artifactId}</finalName> <!-- 创建SpringBoot可执行jar --> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
因为我们在parent
的pom.xml
中引入了Spring Cloud的依赖管理,因此,无需指定相关组件的版本。只有我们自己编写的组件和未在Spring Boot和Spring Cloud中引入的组件,才需要指定版本。
最后,我们还需要一个build
模块,把所有模块放到一起编译。建立build
文件夹并创建pom.xml
如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.itranswarp.exchange</groupId> <artifactId>build</artifactId> <version>1.0</version> <packaging>pom</packaging> <name>Warp Exchange</name>
<!-- 按相对路径列出所有模块 --> <modules> <module>../common</module> <module>../config</module> <module>../parent</module> <module>../push</module> <module>../quotation</module> <module>../trading-api</module> <module>../trading-engine</module> <module>../trading-sequencer</module> <module>../ui</module> </modules></project>
我们还需要创建目录config-repo
来存储Spring Cloud Config服务器端的配置文件。
最后,将所有模块导入IDE,可正常开发、编译、运行。如果要在命令行模式下运行,进入build
文件夹使用Maven编译即可:
warpexchange $ cd build && mvn clean package
本地开发环境
在本地开发时,我们需要经常调试代码。除了安装JDK,选择一个IDE外,我们还需要在本地运行MySQL、Redis、Kafka,以及Kafka依赖的ZooKeeper服务。
考虑到手动安装各个服务在不同操作系统下的差异,以及初始化数据非常麻烦,我们使用Docker Desktop来运行这些基础服务,需要在build
目录下编写一个docker-compose.yml
文件定义我们要运行的所有服务:
version: "3"services: zookeeper: image: bitnami/zookeeper:3.5 container_name: zookeeper ports: - "2181:2181" environment: - ALLOW_ANONYMOUS_LOGIN=yes volumes: - "./docker/zookeeper-data:/bitnami"
kafka: image: bitnami/kafka:3.0 container_name: kafka ports: - "9092:9092" depends_on: - zookeeper environment: - KAFKA_BROKER_ID=1 - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true - ALLOW_PLAINTEXT_LISTENER=yes volumes: - "./docker/kafka-data:/bitnami"
redis: image: redis:6.2 container_name: redis ports: - "6379:6379" volumes: - "./docker/redis-data:/data"
mysql: image: mysql:8.0 container_name: mysql ports: - "3306:3306" command: --default-authentication-plugin=mysql_native_password environment: - MYSQL_ROOT_PASSWORD=password volumes: - "./sql/schema.sql:/docker-entrypoint-initdb.d/1-schema.sql:ro" - "./docker/mysql-data:/var/lib/mysql"
在上述docker-compose.yml
文件中,我们定义了MySQL、Redis、Kafka以及Kafka依赖的ZooKeeper服务,各服务均暴露标准端口,且MySQL的root
口令设置为password
,第一次启动MySQL时,使用sql/schema.sql
文件初始化数据库表结构。所有数据盘均挂载到build
目录下的docker
目录。
在build
目录下运行docker-compose up -d
即可启动容器:
build $ docker-compose up -dCreating network "build_default" with the default driverCreating zookeeper ... doneCreating mysql ... doneCreating redis ... doneCreating kafka ... done
在Docker Desktop中可看到运行状态:
如果要删除开发环境的所有数据,首先停止运行Docker容器进程并删除,然后删除build
目录下的docker
目录,重新运行docker-compose
即可。
Spring Cloud Config
Spring Cloud Config是Spring Cloud的一个子项目,它的主要目的是解决多个Spring Boot应用启动时,应该如何读取配置文件的问题。
对于单体应用,即一个独立的Spring Boot应用,我们会把配置写在application.yml
文件中。如果配置需要针对多个环境,可以用---
分隔并标注好环境:
## application.yml## 通用配置:spring: datasource: url: jdbc:mysql://localhost/test
---
## test profile:spring: config: activate: on-profile: test datasource: url: jdbc:mysql://172.16.0.100/test
这种配置方式针对单个Spring Boot应用是可行的,但是,针对分布式应用,有多个Spring Boot应用需要启动时,分散在各个应用中的配置既不便于管理,也不便于复用相同的配置。
Spring Cloud Config提供了一个通用的分布式应用的配置解决方案。它把配置分为两部分:
- Config Server:配置服务器,负责读取所有配置;
- Config Client:嵌入到各个Spring Boot应用中,本地无配置信息,启动时向服务器请求配置。
我们先来看看如何搭建一个Spring Cloud Config Server,即配置服务器。
首先,在config
模块中引入spring-cloud-config-server
依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-config-server</artifactId></dependency>
然后,编写一个ConfigApplication
入口,标注@EnableConfigServer
:
@EnableConfigServer@SpringBootApplicationpublic class ConfigApplication { public static void main(String[] args) { SpringApplication.run(ConfigApplication.class, args); }}
最后,在application.yml
中设置如何搜索配置。Spring Cloud Config支持多种配置方式,包括从本地文件、Git仓库、数据库等多个地方读取配置。这里我们选择以本地文件的方式读取配置文件,这也是最简单的一种配置方式:
## 配置服务器的端口,通常设置为8888:server: port: 8888
spring: application: name: config-server profiles: # 从文件读取配置时,Config Server激活的profile必须设定为native: active: native cloud: config: server: native: # 设置配置文件的搜索路径: search-locations: file:./config-repo, file:../config-repo, file:../../config-repo
在config-repo
目录下,存放的就是一系列配置文件:
config-repo/├── application-default.yml├── application-test.yml├── application.yml├── push.yml├── quotation.yml├── trading-api.yml├── trading-engine.yml├── trading-sequencer.yml├── ui-default.yml└── ui.yml
至此,配置服务器就完成了,直接运行ConfigApplication
即可启动配置服务器。在开发过程中,保持配置服务器在后台运行即可。
接下来,对于每个负责业务的Spring Boot应用,我们需要从Spring Cloud Config Server读取配置。读取配置并不是说本地零配置,还是需要一点基础配置信息。以ui
项目为例,编写application.yml
如下:
spring: application: # 设置app名称: name: ui config: # 导入Config Server地址: import: configserver:${CONFIG_SERVER:http://localhost:8888}
上述默认的Config Server配置为http://localhost:8888
,也可以通过环境变量指定Config Server的地址。
下一步是在ui
模块的pom.xml
中添加依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId></dependency>
接下来正常启动UIApplication
,该应用就会自动从Config Server读取配置。由于我们指定了应用的名称是ui
,且默认的profile
是default
,因此,Config Server将返回以下4个配置文件:
- ui-default.yml
- application-default.yml
- ui.yml
- application.yml
前面的配置文件优先级较高,后面的配置文件优先级较低。如果出现相同的配置项,则在优先级高的配置生效。
我们可以在浏览器访问http://localhost:8888/ui/default
看到Config Server返回的配置,它是一个JSON文件:
{ "name": "ui", "profiles": [ "default" ], "label": null, "version": null, "state": null, "propertySources": [ { "name": "file:../config-repo/ui-default.yml", "source": {...} }, { "name": "file:../config-repo/application-default.yml", "source": {...} }, { "name": "file:../config-repo/ui.yml", "source": {...} }, { "name": "file:../config-repo/application.yml", "source": {...} } ]}
如果我们启动UIApplication
时传入SPRING_PROFILES_ACTIVE=test
,将profile设置为test
,则Config Server返回的文件如下:
- ui-test.yml
- application-test.yml
- ui.yml
- application.yml
可以通过http://localhost:8888/ui/test
查看返回的配置。由于文件ui-test.yml
不存在,因此,实际配置由3个文件合并而成。
我们可以很容易地看到,一个Spring Boot应用在启动时,首先要设置自己的name
并导入Config Server的URL,再根据当前活动的profile
,由Config Server返回多个配置文件:
- {name}-{profile}.yml
- application-{profile}.yml
- {name}.yml
- application.yml
其中,{name}-{xxx}.yml
是针对某个应用+某个profile
的特定配置,{name}.yml
是针对某个应用+所有profile的配置,application-{profile}.yml
是针对某个profile
的全局配置,application.yml
是所有应用的全局配置。搭配各种配置文件就可以灵活组合配置。一般来说,全局默认的配置放在application.yml
中,例如数据库连接:
spring: datasource: url: jdbc:mysql://localhost/test
这样保证了默认连接到本地数据库,在生产环境中会直接报错而不是连接到错误的数据库。
在生产环境,例如profile
设置为prod
,则可以将数据库连接写在application-prod.yml
中,使得所有生产环境的应用读取到的数据库连接是一致的:
spring: datasource: url: jdbc:mysql://172.16.0.100/prod_db
某个应用自己特定的配置则应当放到{name}.yml
和{name}-{profile}.yml
中。
在设置好各个配置文件后,应当通过浏览器检查Config Server返回的配置是否符合预期。
Spring Cloud Config还支持配置多个profile,以及从加密的配置源读取配置等。如果遇到更复杂的需求,可参考Spring Cloud Config的文档。
环境变量
需要特别注意,在config-repo
的配置文件里,使用的环境变量,不是Config Server的环境变量,而是具体某个Spring Boot应用的环境变量。
我们举个例子:假定ui.yml
定义如下:
server: port: ${APP_PORT:8000}
当UIApplication
启动时,它获得的配置为server.port=${APP_PORT:8000}
。Config Server不会替换任何环境变量,而是将它们原封不动地返回给UIApplication
,由UIApplication
根据自己的环境变量解析后获得最终配置。如果我们启动UIApplication
时传入环境变量:
$ java -DAPP_PORT=7000 -jar ui.jar
则UIApplication
最终读取的配置server.port
为7000
。
可见,使用Spring Cloud Config时,读取配置文件步骤如下:
- 启动XxxApplication时,读取自身的
application.yml
,获得name
和Config Server地址; - 根据
name
、profile
和Config Server地址,获得一个或多个有优先级的配置文件; - 按优先级合并配置项;
- 如果配置项中存在环境变量,则使用Xxx应用本身的环境变量去替换占位符。
环境变量通常用于配置一些敏感信息,如数据库连接口令,它们不适合明文写在config-repo
的配置文件里。
常见错误
启动一个Spring Boot应用时,如果出现Unable to load config data
错误:
java.lang.IllegalStateException: Unable to load config data from 'configserver:http://localhost:8888' at org.springframework.boot.context.config.StandardConfigDataLocationResolver.getReferences at ...
需要检查是否在pom.xml
中引入了spring-cloud-starter-config
,因为没有引入该依赖时,应用无法解析本地配置的import: configserver:xxx
。
如果在启动一个Spring Boot应用时,Config Server没有运行,通常错误信息是因为没有读取到配置导致无法创建某个Bean。
参考源码
小结
我们以Spring Boot为基础,并通过Maven的模块化配置搭建了项目的基本结构,依赖的基础组件通过Docker Desktop运行并初始化数据。对于多个服务组成的分布式应用来说,使用Spring Cloud Config可满足应用的配置需求。
24.3 设计交易引擎
一个完整的交易引擎包括资产系统、订单系统、撮合引擎和清算系统。
资产系统不仅记录了每个用户的所有资产,而且还要根据业务随时冻结和解冻用户资产。例如,下买单时,根据买入价格和买入数量,计算需要冻结的USD,然后对用户的可用USD进行冻结。
订单系统跟踪所有用户的所有订单。
撮合引擎是交易引擎中最重要的一个组件,它根据价格优先、时间优先的原则,对买卖订单进行匹配,匹配成功则成交,匹配不成功则放入订单簿等待后续成交。
清算系统则是处理来自撮合引擎的撮合结果。
┌ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┐ ┌─────────┐ ┌─────────┐Order Request ──┼─▶│ Order │───▶│ Match │ │ └─────────┘ └─────────┘ │ │ │ │ │ │ │ ▼ ▼ │ ┌─────────┐ ┌─────────┐ │ │ Asset │◀───│Clearing │ │ └─────────┘ └─────────┘ └ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
最后,把上述几个组件组合起来,我们就得到了一个完善的交易引擎。
我们观察交易引擎的输入,它是一系列确定的订单序列,而交易引擎的输出则是成交信息。与此同时,交易引擎本身是一个确定性的状态机,它的内部状态包括订单集、资产表和订单簿。每当一个新的订单请求被输入后,状态机即更新状态,然后输出成交信息。
注意到交易引擎在任何一个时刻的状态都是确定的,在一个确定的状态下,继续给定一个确定的订单请求,下一个状态也是确定的,即:
交易引擎当前状态是Sn,则下一个输入On+1会将其状态更新为Sn+1。
因此,对于一组给定的输入订单集合[O1, O2, O3, …],交易引擎每次内部状态的更新和输出都是完全确定的,与时间无关。
我们换句话说,就是给定一组订单输入的集合,让一个具有初始状态的交易引擎去执行,获得的结果集为[R1, R2, R3, …],把同样的一组订单输入集合让另一个具有初始状态的交易引擎去执行,获得的结果集完全相同。
因此,要实现交易引擎的集群,可以同时运行多个交易引擎的实例,然后对每个实例输入相同的订单请求序列,就会得到完全相同的一组输出:
┌──────┐ ┌─▶│Engine│──▶ R1, R2, R3... │ └──────┘O1, O2, O3... ──┤ │ ┌──────┐ └─▶│Engine│──▶ R1, R2, R3... └──────┘
可见,交易引擎是一个事件驱动的状态机。
实现交易引擎有多种方式,例如,把资产、订单等放入数据库,基于数据库事务来保证交易完整性,这种方式的缺点就是速度非常慢,TPS很低。
也可以把全部组件放在内存中,这样能轻松实现一个高性能的交易引擎,但内存的易失性会导致宕机重启后丢失交易信息,因此,基于内存的交易引擎必须要解决数据的持久化问题。
在Warp Exchange项目中,我们将实现一个完全基于内存的交易引擎。
24.3.1 设计资产系统
在交易系统中,用户资产是指用户以各种方式将USD、BTC充入交易所后的余额。本节我们来实现一个用户资产系统。
用户在买入BTC时,需要花费USD,而卖出BTC后,获得USD。当用户下单买入时,系统会先冻结对应的USD金额;当用户下单卖出时,系统会先冻结对应的BTC。之所以需要有冻结这一操作,是因为判断能否下单成功,是根据用户的可用资产判断。每下一个新的订单,就会有一部分可用资产被冻结,因此,用户资产本质上是一个由用户ID和资产ID标识的二维表:
用户ID | 资产ID | 可用 | 冻结 |
---|---|---|---|
101 | USD | 8900.3 | 1200 |
101 | BTC | 500 | 0 |
102 | USD | 12800 | 0 |
103 | BTC | 0 | 50 |
上述二维表有一个缺陷,就是对账很困难,因为缺少了一个关键的负债账户。对任何一个资产管理系统来说,要时刻保证整个系统的资产负债表为零。
对交易所来说,用户拥有的USD和BTC就是交易所的系统负债,只需引入一个负债账户,记录所有用户权益,就可以保证整个系统的资产负债表为零。假设负债账户以ID为1的系统用户表示,则用户资产表如下:
用户ID | 资产ID | 可用 | 冻结 |
---|---|---|---|
1 | USD | -22900.3 | 0 |
1 | BTC | -550 | 0 |
101 | USD | 8900.3 | 1200 |
101 | BTC | 500 | 0 |
102 | USD | 12800 | 0 |
103 | BTC | 0 | 50 |
引入了负债账户后,我们就可以定义资产的数据结构了。
在数据库中,上述表结构就是资产表的结构,将用户ID和资产ID标记为联合主键即可。
但是在内存中,我们怎么定义资产结构呢?
可以使用一个两层的ConcurrentMap
定义如下:
// 用户ID -> (资产ID -> Asset)ConcurrentMap<Long, ConcurrentMap<AssetEnum, Asset>> userAssets = new ConcurrentHashMap<>();
第一层Map
的Key是用户ID,第二层Map
的Key是资产ID,这样就可以用Asset
结构表示资产:
public class Asset { // 可用余额: BigDecimal available; // 冻结余额: BigDecimal frozen;
public Assets() { this(BigDecimal.ZERO, BigDecimal.ZERO); }
public Assets(BigDecimal available, BigDecimal frozen) { this.available = available; this.frozen = frozen; }}
下一步,我们在AssetService
上定义对用户资产的操作。实际上,所有资产操作只有一种操作,即转账。转账类型可用Transfer
定义为枚举类:
public enum Transfer { // 可用转可用: AVAILABLE_TO_AVAILABLE, // 可用转冻结: AVAILABLE_TO_FROZEN, // 冻结转可用: FROZEN_TO_AVAILABLE;}
转账操作只需要一个tryTransfer()
方法,实现如下:
public boolean tryTransfer(Transfer type, Long fromUser, Long toUser, AssetEnum assetId, BigDecimal amount, boolean checkBalance) { // 转账金额不能为负: if (amount.signum() < 0) { throw new IllegalArgumentException("Negative amount"); } // 获取源用户资产: Asset fromAsset = getAsset(fromUser, assetId); if (fromAsset == null) { // 资产不存在时初始化用户资产: fromAsset = initAssets(fromUser, assetId); } // 获取目标用户资产: Asset toAsset = getAsset(toUser, assetId); if (toAsset == null) { // 资产不存在时初始化用户资产: toAsset = initAssets(toUser, assetId); } return switch (type) { case AVAILABLE_TO_AVAILABLE -> { // 需要检查余额且余额不足: if (checkBalance && fromAsset.available.compareTo(amount) < 0) { // 转账失败: yield false; } // 源用户的可用资产减少: fromAsset.available = fromAsset.available.subtract(amount); // 目标用户的可用资产增加: toAsset.available = toAsset.available.add(amount); // 返回成功: yield true; } // 从可用转至冻结: case AVAILABLE_TO_FROZEN -> { if (checkBalance && fromAsset.available.compareTo(amount) < 0) { yield false; } fromAsset.available = fromAsset.available.subtract(amount); toAsset.frozen = toAsset.frozen.add(amount); yield true; } // 从冻结转至可用: case FROZEN_TO_AVAILABLE -> { if (checkBalance && fromAsset.frozen.compareTo(amount) < 0) { yield false; } fromAsset.frozen = fromAsset.frozen.subtract(amount); toAsset.available = toAsset.available.add(amount); yield true; } default -> { throw new IllegalArgumentException("invalid type: " + type); } };}
除了用户存入资产时,需要调用tryTransfer()
并且不检查余额,因为此操作是从系统负债账户向用户转账,其他常规转账操作均需要检查余额:
public void transfer(Transfer type, Long fromUser, Long toUser, AssetEnum assetId, BigDecimal amount) { if (!tryTransfer(type, fromUser, toUser, assetId, amount, true)) { throw new RuntimeException("Transfer failed"); }}
冻结操作可在tryTransfer()
基础上封装一个方法:
public boolean tryFreeze(Long userId, AssetEnum assetId, BigDecimal amount) { return tryTransfer(Transfer.AVAILABLE_TO_FROZEN, userId, userId, assetId, amount, true);}
解冻操作实际上也是在tryTransfer()
基础上封装:
public void unfreeze(Long userId, AssetEnum assetId, BigDecimal amount) { if (!tryTransfer(Transfer.FROZEN_TO_AVAILABLE, userId, userId, assetId, amount, true)) { throw new RuntimeException("Unfreeze failed"); }}
可以编写一个AssetServiceTest
,测试各种转账操作:
public class AssetServiceTest { @Test void tryTransfer() { // TODO... }}
并验证在任意操作后,所有用户资产的各余额总和为0
。
最后是问题解答:
为什么不使用数据库?
因为我们要实现的交易引擎是100%全内存交易引擎,因此所有用户资产均存放在内存中,无需访问数据库。
为什么要使用ConcurrentMap?
使用ConcurrentMap
并不是为了让多线程并发写入,因为AssetService
中并没有任何同步锁。对AssetService
进行写操作必须是单线程,不支持多线程调用tryTransfer()
。
但是读取Asset支持多线程并发读取,这也是使用ConcurrentMap
的原因。如果改成HashMap
,根据不同JDK版本的实现不同,多线程读取HashMap
可能造成死循环(注意这不是HashMap
的bug),必须引入同步机制。
如何扩展以支持更多的资产类型?
我们在AssetEnum
中以枚举方式定义了USD和BTC两种资产,如果要扩展到更多资产类型,可以以整型ID作为资产ID,同时需要管理一个资产ID到资产名称的映射,这样可以在业务需要的时候更改资产名称。
参考源码
小结
本节我们讨论并实现了一个基于内存的高性能的用户资产系统,其核心只有一个tryTransfer()
转账方法,业务逻辑非常简单。
24.3.2 设计订单系统
上一节我们实现了一个资产系统,本节我们来设计并实现一个订单系统。
订单系统的目的是为了管理所有的活动订单,并给每个新订单一个递增的序列号。由于在创建订单时需要冻结用户资产,因此,我们定义的OrderService
会引用AssetService
:
public class OrderService { // 引用AssetService: final AssetService assetService;
public OrderService(@Autowired AssetService assetService) { this.assetService = assetService; }}
一个订单由订单ID唯一标识,此外,订单包含以下重要字段:
- userId:订单关联的用户ID;
- sequenceId:定序ID,相同价格的订单根据定序ID进行排序;
- direction:订单方向:买或卖;
- price:订单价格;
- quantity:订单数量;
- unfilledQuantity:尚未成交的数量;
- status:订单状态,包括等待成交、部分成交、完全成交、部分取消、完全取消。
一个订单被成功创建后,它后续由撮合引擎处理时,只有unfilledQuantity
和status
会发生变化,其他属性均为只读,不会改变。
当订单状态变为完全成交、部分取消、完全取消时,订单就已经处理完成。处理完成的订单从订单系统中删除,并写入数据库永久变为历史订单。用户查询活动订单时,需要读取订单系统,用户查询历史订单时,只需从数据库查询,就与订单系统无关了。
我们定义OrderEntity
如下:
public class OrderEntity { // 订单ID / 定序ID / 用户ID: public Long id; public long sequenceId; public Long userId;
// 价格 / 方向 / 状态: public BigDecimal price; public Direction direction; public OrderStatus status;
// 订单数量 / 未成交数量: public BigDecimal quantity; public BigDecimal unfilledQuantity;
// 创建和更新时间: public long createdAt; public long updatedAt;}
处于简化设计的缘故,该对象既作为订单系统的订单对象,也作为数据库映射实体。
根据业务需要,订单系统需要支持:
- 根据订单ID查询到订单;
- 根据用户ID查询到该用户的所有活动订单。
因此,OrderService
需要用两个Map
存储活动订单:
public class OrderService { // 跟踪所有活动订单: Order ID => OrderEntity final ConcurrentMap<Long, OrderEntity> activeOrders = new ConcurrentHashMap<>();
// 跟踪用户活动订单: User ID => Map(Order ID => OrderEntity) final ConcurrentMap<Long, ConcurrentMap<Long, OrderEntity>> userOrders = new ConcurrentHashMap<>();
添加一个新的Order
时,需要同时更新activeOrders
和userOrders
。同理,删除一个Order
时,需要同时从activeOrders
和userOrders
中删除。
我们先编写创建订单的方法:
/** * 创建订单,失败返回null: */public OrderEntity createOrder(long sequenceId, long ts, Long orderId, Long userId, Direction direction, BigDecimal price, BigDecimal quantity) { switch (direction) { case BUY -> { // 买入,需冻结USD: if (!assetService.tryFreeze(userId, AssetEnum.USD, price.multiply(quantity))) { return null; } } case SELL -> { // 卖出,需冻结BTC: if (!assetService.tryFreeze(userId, AssetEnum.BTC, quantity)) { return null; } } default -> throw new IllegalArgumentException("Invalid direction."); } // 实例化Order: OrderEntity order = new OrderEntity(); order.id = orderId; order.sequenceId = sequenceId; order.userId = userId; order.direction = direction; order.price = price; order.quantity = quantity; order.unfilledQuantity = quantity; order.createdAt = order.updatedAt = ts; // 添加到ActiveOrders: this.activeOrders.put(order.id, order); // 添加到UserOrders: ConcurrentMap<Long, OrderEntity> uOrders = this.userOrders.get(userId); if (uOrders == null) { uOrders = new ConcurrentHashMap<>(); this.userOrders.put(userId, uOrders); } uOrders.put(order.id, order); return order;}
后续在清算过程中,如果发现一个Order
已经完成或取消后,需要调用删除方法将活动订单从OrderService
中删除:
public void removeOrder(Long orderId) { // 从ActiveOrders中删除: OrderEntity removed = this.activeOrders.remove(orderId); if (removed == null) { throw new IllegalArgumentException("Order not found by orderId in active orders: " + orderId); } // 从UserOrders中删除: ConcurrentMap<Long, OrderEntity> uOrders = userOrders.get(removed.userId); if (uOrders == null) { throw new IllegalArgumentException("User orders not found by userId: " + removed.userId); } if (uOrders.remove(orderId) == null) { throw new IllegalArgumentException("Order not found by orderId in user orders: " + orderId); }}
删除订单时,必须从activeOrders
和userOrders
中全部成功删除,否则会造成OrderService
内部状态混乱。
最后,根据业务需求,我们加上根据订单ID查询、根据用户ID查询的方法:
// 根据订单ID查询Order,不存在返回null:public OrderEntity getOrder(Long orderId) { return this.activeOrders.get(orderId);}// 根据用户ID查询用户所有活动Order,不存在返回null:public ConcurrentMap<Long, OrderEntity> getUserOrders(Long userId) { return this.userOrders.get(userId);}
整个订单子系统的实现就是这么简单。
下面是问题解答。
Order的id和sequenceId为何不合并使用一个ID?
订单ID是Order.id,是用户看到的订单标识,而Order.sequenceId是系统内部给订单的定序序列号,用于后续撮合时进入订单簿的排序,两者功能不同。
可以使用一个简单的算法来根据Sequence ID计算Order ID:
OrderID = SequenceID * 10000 + today("YYmm")
因为SequenceID是全局唯一的,我们给SequenceID添加创建日期的”YYmm”部分,可轻松实现按月分库保存和查询。
参考源码
小结
一个订单系统在内存中维护所有用户的活动订单,并提供删除和查询方法。
24.3.3 设计撮合引擎
在证券交易系统中,撮合引擎是实现买卖盘成交的关键组件。我们先分析撮合引擎的工作原理,然后设计并实现一个最简化的撮合引擎。
在证券市场中,撮合交易是一种微观价格发现模型,它允许买卖双方各自提交买卖订单并报价,按价格优先,时间优先的顺序,凡买单价格大于等于卖单价格时,双方即达成价格协商并成交。在A股身经百战的老股民对此规则应该非常熟悉,这里不再详述。
我们将讨论如何从技术上来实现它。对于撮合引擎来说,它必须维护两个买卖盘列表,一个买盘,一个卖盘,买盘按价格从高到低排序,确保报价最高的订单排在最前面;卖盘则相反,按照价格从低到高排序,确保报价最低的卖单排在最前面。
下图是一个实际的买卖盘:
对于买盘来说,上图的订单排序为2086.50
,2086.09
,2086.06
,2086.00
,2085.97
,……
对于卖盘来说,上图的订单排序为2086.55
,2086.75
,2086.77
,2086.90
,2086.99
,……
不可能出现买1价格大于等于卖1价格的情况,因为这意味着应该成交的买卖订单没有成交却在订单簿上等待成交。
对于多个价格相同的订单,例如2086.55
,很可能张三卖出1,李四卖出3,累计数量是4。当一个新的买单价格≥2086.55
时,到底优先和张三的卖单成交还是优先和李四的卖单成交呢?这要看张三和李四的订单时间谁更靠前。
我们在订单上虽然保存了创建时间,但排序时,是根据定序ID即sequenceId
来排序,以确保全局唯一。时间本身实际上是订单的一个普通属性,仅展示给用户,不参与业务排序。
下一步是实现订单簿OrderBook
的表示。一个直观的想法是使用List<Order>
,并对订单进行排序。但是,在证券交易中,使用List
会导致两个致命问题:
- 插入新的订单时,必须从头扫描
List<Order>
,以便在合适的地方插入Order
,平均耗时O(N); - 取消订单时,也必须从头扫描
List<Order>
,平均耗时O(N)。
更好的方法是使用红黑树,它是一种自平衡的二叉排序树,插入和删除的效率都是O(logN),对应的Java类是TreeMap
。
所以我们定义OrderBook
的结构就是一个TreeMap<OrderKey, OrderEntity>
,它的排序根据OrderKey
决定。由业务规则可知,负责排序的OrderKey
只需要sequenceId
和price
即可:
// 以record实现的OrderKey:public record OrderKey(long sequenceId, BigDecimal price) {}
因此,OrderBook
的核心数据结构就可以表示如下:
public class OrderBook { public final Direction direction; // 方向 public final TreeMap<OrderKey, Order> book; // 排序树
public OrderBook(Direction direction) { this.direction = direction; this.book = new TreeMap<>(???); }}
有的童鞋注意到TreeMap
的排序要求实现Comparable
接口或者提供一个Comparator
。我们之所以没有在OrderKey
上实现Comparable
接口是因为买卖盘排序的价格规则不同,因此,编写两个Comparator
分别用于排序买盘和卖盘:
private static final Comparator<OrderKey> SORT_SELL = new Comparator<>() { public int compare(OrderKey o1, OrderKey o2) { // 价格低在前: int cmp = o1.price().compareTo(o2.price()); // 时间早在前: return cmp == 0 ? Long.compare(o1.sequenceId(), o2.sequenceId()) : cmp; }};
private static final Comparator<OrderKey> SORT_BUY = new Comparator<>() { public int compare(OrderKey o1, OrderKey o2) { // 价格高在前: int cmp = o2.price().compareTo(o1.price()); // 时间早在前: return cmp == 0 ? Long.compare(o1.sequenceId(), o2.sequenceId()) : cmp; }};
这样,OrderBook
的TreeMap
排序就由Direction
指定:
public OrderBook(Direction direction) { this.direction = direction; this.book = new TreeMap<>(direction == Direction.BUY ? SORT_BUY : SORT_SELL);}
这里友情提示Java的BigDecimal
比较大小的大坑:比较两个BigDecimal
是否值相等,一定要用compareTo()
,不要用equals()
,因为1.2
和1.20
因为scale
不同导致equals()
返回false
。
在Java中比较两个BigDecimal的值只能使用compareTo(),不能使用equals()!
再给OrderBook
添加插入、删除和查找首元素方法:
public OrderEntity getFirst() { return this.book.isEmpty() ? null : this.book.firstEntry().getValue();}
public boolean remove(OrderEntity order) { return this.book.remove(new OrderKey(order.sequenceId, order.price)) != null;}
public boolean add(OrderEntity order) { return this.book.put(new OrderKey(order.sequenceId, order.price), order) == null;}
现在,有了买卖盘,我们就可以编写撮合引擎了。定义MatchEngine
核心数据结构如下:
public class MatchEngine { public final OrderBook buyBook = new OrderBook(Direction.BUY); public final OrderBook sellBook = new OrderBook(Direction.SELL); public BigDecimal marketPrice = BigDecimal.ZERO; // 最新市场价 private long sequenceId; // 上次处理的Sequence ID}
一个完整的撮合引擎包含一个买盘、一个卖盘和一个最新成交价(初始值为0)。撮合引擎的输入是一个OrderEntity
实例,每处理一个订单,就输出撮合结果MatchResult
,核心处理方法定义如下:
public MatchResult processOrder(long sequenceId, OrderEntity order) { ...}
下面我们讨论如何处理一个具体的订单。对于撮合交易来说,如果新订单是一个买单,则首先尝试在卖盘中匹配价格合适的卖单,如果匹配成功则成交。一个大的买单可能会匹配多个较小的卖单。当买单被完全匹配后,说明此买单已完全成交,处理结束,否则,如果存在未成交的买单,则将其放入买盘。处理卖单的逻辑是类似的。
我们把已经挂在买卖盘的订单称为挂单(Maker),当前正在处理的订单称为吃单(Taker),一个Taker订单如果未完全成交则转为Maker挂在买卖盘,因此,处理当前Taker订单的逻辑如下:
public MatchResult processOrder(long sequenceId, OrderEntity order) { switch (order.direction) { case BUY: // 买单与sellBook匹配,最后放入buyBook: return processOrder(order, this.sellBook, this.buyBook); case SELL: // 卖单与buyBook匹配,最后放入sellBook: return processOrder(order, this.buyBook, this.sellBook); default: throw new IllegalArgumentException("Invalid direction."); }}
MatchResult processOrder(long sequenceId, OrderEntity takerOrder, OrderBook makerBook, OrderBook anotherBook) { ...}
根据价格匹配,直到成交双方有一方完全成交或成交条件不满足时结束处理,我们直接给出processOrder()
的业务逻辑代码:
MatchResult processOrder(long sequenceId, OrderEntity takerOrder, OrderBook makerBook, OrderBook anotherBook) { this.sequenceId = sequenceId; long ts = takerOrder.createdAt; MatchResult matchResult = new MatchResult(takerOrder); BigDecimal takerUnfilledQuantity = takerOrder.quantity; for (;;) { OrderEntity makerOrder = makerBook.getFirst(); if (makerOrder == null) { // 对手盘不存在: break; } if (takerOrder.direction == Direction.BUY && takerOrder.price.compareTo(makerOrder.price) < 0) { // 买入订单价格比卖盘第一档价格低: break; } else if (takerOrder.direction == Direction.SELL && takerOrder.price.compareTo(makerOrder.price) > 0) { // 卖出订单价格比买盘第一档价格高: break; } // 以Maker价格成交: this.marketPrice = makerOrder.price; // 待成交数量为两者较小值: BigDecimal matchedQuantity = takerUnfilledQuantity.min(makerOrder.unfilledQuantity); // 成交记录: matchResult.add(makerOrder.price, matchedQuantity, makerOrder); // 更新成交后的订单数量: takerUnfilledQuantity = takerUnfilledQuantity.subtract(matchedQuantity); BigDecimal makerUnfilledQuantity = makerOrder.unfilledQuantity.subtract(matchedQuantity); // 对手盘完全成交后,从订单簿中删除: if (makerUnfilledQuantity.signum() == 0) { makerOrder.updateOrder(makerUnfilledQuantity, OrderStatus.FULLY_FILLED, ts); makerBook.remove(makerOrder); } else { // 对手盘部分成交: makerOrder.updateOrder(makerUnfilledQuantity, OrderStatus.PARTIAL_FILLED, ts); } // Taker订单完全成交后,退出循环: if (takerUnfilledQuantity.signum() == 0) { takerOrder.updateOrder(takerUnfilledQuantity, OrderStatus.FULLY_FILLED, ts); break; } } // Taker订单未完全成交时,放入订单簿: if (takerUnfilledQuantity.signum() > 0) { takerOrder.updateOrder(takerUnfilledQuantity, takerUnfilledQuantity.compareTo(takerOrder.quantity) == 0 ? OrderStatus.PENDING : OrderStatus.PARTIAL_FILLED, ts); anotherBook.add(takerOrder); } return matchResult;}
可见,撮合匹配的业务逻辑是相对简单的。撮合结果记录在MatchResult
中,它可以用一个Taker订单和一系列撮合匹配记录表示:
public class MatchResult { public final Order takerOrder; public final List<MatchDetailRecord> MatchDetails = new ArrayList<>();
// 构造方法略}
每一笔撮合记录则由成交双方、成交价格与数量表示:
public record MatchDetailRecord( BigDecimal price, BigDecimal quantity, OrderEntity takerOrder, OrderEntity makerOrder) {}
撮合引擎返回的MatchResult
包含了本次处理的完整结果,下一步需要把MatchResult
发送给清算系统,对交易双方进行清算即完成了整个交易的处理。
我们可以编写一个简单的测试来验证撮合引擎工作是否正常。假设如下的订单依次输入到撮合引擎:
// 方向 价格 数量buy 2082.34 1sell 2087.6 2buy 2087.8 1buy 2085.01 5sell 2088.02 3sell 2087.60 6buy 2081.11 7buy 2086.0 3buy 2088.33 1sell 2086.54 2sell 2086.55 5buy 2086.55 3
经过撮合后最终买卖盘及市场价如下:
2088.02 32087.60 62086.55 4---------2086.55---------2086.00 32085.01 52082.34 12081.11 7
如果我们仔细观察整个系统的输入和输出,输入实际上是一系列按时间排序后的订单(实际排序按sequenceId
),输出是一系列MatchResult
,内部状态的变化就是买卖盘以及市场价的变化。如果两个初始状态相同的MatchEngine
,输入的订单序列是完全相同的,则我们得到的MatchResult
输出序列以及最终的内部状态也是完全相同的。
下面是问题解答。
如何实现多个交易对?
一个撮合引擎只能处理一个交易对,如果要实现多个交易对,则需要构造一个“多撮合实例”的引擎:
class MatchEngineGroup { Map<Long, MatchEngine> engines = new HashMap<>(); public MatchResult processOrder(long sequenceId, OrderEntity order) { // 获得订单的交易对ID: Long symbolId = order.symbolId; // 查找交易对所对应的引擎实例: MatchEngine engine = engines.get(symbolId); if (engine == null) { // 该交易对的第一个订单: engine = new MatchEngine(); engines.put(symbolId, engine); } // 由该实例处理订单: return engine.processOrder(sequenceId, order); }}
需要给订单增加symbolId
属性以标识该订单是哪个交易对。
参考源码
小结
本文讨论并实现了一个可工作的撮合引擎核心。实现撮合引擎的关键在于将业务模型转换为高效的数据结构。只要保证核心数据结构的简单和高效,撮合引擎的业务逻辑编写是非常容易的。
24.3.4 设计清算系统
在证券交易系统中,一个订单成功创建后,经过撮合引擎,就可以输出撮合结果。但此时买卖双方的资产还没有变化,要把撮合结果最终实现为买卖双方的资产交换,就需要清算。
清算系统就是处理撮合结果,将买卖双方冻结的USD和BTC分别交换到对方的可用余额,就使得买卖双方真正完成了资产交换。
因此,我们设计清算系统ClearingService
,需要引用AssetService
和OrderService
:
public class ClearingService { final AssetService assetService; final OrderService orderService;
public ClearingService(@Autowired AssetService assetService, @Autowired OrderService orderService) { this.assetService = assetService; this.orderService = orderService; }}
当撮合引擎输出MatchResult
后,ClearingService
需要处理该结果,该清算方法代码框架如下:
public void clearMatchResult(MatchResult result) { OrderEntity taker = result.takerOrder; switch (taker.direction) { case BUY -> { // TODO } case SELL -> { // TODO } default -> throw new IllegalArgumentException("Invalid direction."); }}
对Taker买入成交的订单,处理时需要注意,成交价格是按照Maker的报价成交的,而Taker冻结的金额是按照Taker订单的报价冻结的,因此,解冻后,部分差额要退回至Taker可用余额:
case BUY -> { // 买入时,按Maker的价格成交: for (MatchDetailRecord detail : result.matchDetails) { OrderEntity maker = detail.makerOrder(); BigDecimal matched = detail.quantity(); if (taker.price.compareTo(maker.price) > 0) { // 实际买入价比报价低,部分USD退回账户: BigDecimal unfreezeQuote = taker.price.subtract(maker.price).multiply(matched); assetService.unfreeze(taker.userId, AssetEnum.USD, unfreezeQuote); } // 买方USD转入卖方账户: assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, taker.userId, maker.userId, AssetEnum.USD, maker.price.multiply(matched)); // 卖方BTC转入买方账户: assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, maker.userId, taker.userId, AssetEnum.BTC, matched); // 删除完全成交的Maker: if (maker.unfilledQuantity.signum() == 0) { orderService.removeOrder(maker.id); } } // 删除完全成交的Taker: if (taker.unfilledQuantity.signum() == 0) { orderService.removeOrder(taker.id); }}
对Taker卖出成交的订单,只需将冻结的BTC转入Maker,将Maker冻结的USD转入Taker即可:
case SELL -> { for (MatchDetailRecord detail : result.matchDetails) { OrderEntity maker = detail.makerOrder(); BigDecimal matched = detail.quantity(); // 卖方BTC转入买方账户: assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, taker.userId, maker.userId, AssetEnum.BTC, matched); // 买方USD转入卖方账户: assetService.transfer(Transfer.FROZEN_TO_AVAILABLE, maker.userId, taker.userId, AssetEnum.USD, maker.price.multiply(matched)); // 删除完全成交的Maker: if (maker.unfilledQuantity.signum() == 0) { orderService.removeOrder(maker.id); } } // 删除完全成交的Taker: if (taker.unfilledQuantity.signum() == 0) { orderService.removeOrder(taker.id); }}
当用户取消订单时,ClearingService
需要取消订单冻结的USD或BTC,然后将订单从OrderService
中删除:
public void clearCancelOrder(OrderEntity order) { switch (order.direction) { case BUY -> { // 解冻USD = 价格 x 未成交数量 assetService.unfreeze(order.userId, AssetEnum.USD, order.price.multiply(order.unfilledQuantity)); } case SELL -> { // 解冻BTC = 未成交数量 assetService.unfreeze(order.userId, AssetEnum.BTC, order.unfilledQuantity); } default -> throw new IllegalArgumentException("Invalid direction."); } // 从OrderService中删除订单: orderService.removeOrder(order.id);}
这样,我们就完成了清算系统的实现。
下面是问题解答。
如果有手续费,如何清算?
如果有交易手续费,则首先需要思考:手续费应该定义在哪?
如果我们把手续费定义为一个配置,注入到ClearingService
:
public class ClearingService { @Value("${exchange.fee-rate:0.0005}") BigDecimal feeRate;}
那么问题来了:对于同一个订单输入序列,设定手续费为万分之五,和设定手续费为万分之二,执行后交易引擎的状态和输出结果是不同的!这就使得交易引擎不再是一个确定性状态机,无法重复执行交易序列。
此外,不同用户通常可以有不同的交易费率,例如机构的费率比个人低,做市商的费率可以为0。
要支持不同用户不同的费率,以及保证交易引擎是一个确定性状态机,手续费必须作为订单的一个不变属性,从外部输入,这样交易引擎不再关心如何读取费率。
带手续费的订单在创建时,针对买单,冻结金额不再是价格x数量,而是:
freeze = order.price * order.quantity * (1 + order.feeRate)
首先,需要修改OrderService
创建订单时的冻结逻辑。其次,在清算时,除了买卖双方交换资产,还需要设定一个系统用户,专门接收手续费,将买方手续费从冻结的金额转入系统手续费用户,而卖方获得转入的金额会扣除手续费。
可以为挂单和吃单设置不同的手续费率吗?
可以,需要给订单添加两个费率属性:takerFeeRate
和makerFeeRate
,买方下单冻结时,额外冻结的金额按takerFeeRate
冻结。
清算逻辑会复杂一些,要针对Taker和Maker分别计算不同的费率。
可以设置负费率吗?
可以,通常可以给makerFeeRate
设置负费率,以鼓励做市。清算逻辑会更复杂一些,因为针对负费率的Maker,需要从系统手续费用户转账给Maker。
参考源码
小结
清算系统只负责根据撮合引擎输出的结果进行清算,清算的本质就是根据成交价格和数量对买卖双方的对应资产互相划转。清算系统本身没有状态。
24.3.5 完成交易引擎
我们现在实现了资产模块、订单模块、撮合引擎和清算模块,现在,就可以把它们组合起来,实现一个完整的交易引擎:
public class TradingEngineService { @Autowired AssetService assetService;
@Autowired OrderService orderService;
@Autowired MatchEngine matchEngine;
@Autowired ClearingService clearingService;}
交易引擎由事件驱动,因此,通过订阅Kafka的Topic实现批量读消息,然后依次处理每个事件:
void processMessages(List<AbstractEvent> messages) { for (AbstractEvent message : messages) { processEvent(message); }}
void processEvent(AbstractEvent event) { if (event instanceof OrderRequestEvent) { createOrder((OrderRequestEvent) event); } else if (event instanceof OrderCancelEvent) { cancelOrder((OrderCancelEvent) event); } else if (event instanceof TransferEvent) { transfer((TransferEvent) event); }}
我们目前一共有3种类型的事件,处理都非常简单。以createOrder()
为例,核心代码其实就几行:
void createOrder(OrderRequestEvent event) { // 生成Order ID: long orderId = event.sequenceId * 10000 + (year * 100 + month); // 创建Order: OrderEntity order = orderService.createOrder(event.sequenceId, event.createdAt, orderId, event.userId, event.direction, event.price, event.quantity); if (order == null) { logger.warn("create order failed."); return; } // 撮合: MatchResult result = matchEngine.processOrder(event.sequenceId, order); // 清算: clearingService.clearMatchResult(result);}
核心的业务逻辑并不复杂,只是交易引擎在处理完订单后,仅仅改变自身状态是不够的,它还得向外输出具体的成交信息、订单状态等。因此,需要根据业务需求,在清算后继续收集撮合结果、已完成订单、准备发送的通知等,通过消息系统或Redis向外输出交易信息。如果把这些功能放到同一个线程内同步完成是非常耗时的,更好的方法是把它们先存储起来,再异步处理。例如,对于已完成的订单,可以异步落库:
Queue<List<OrderEntity>> orderQueue = new ConcurrentLinkedQueue<>();
void createOrder(OrderRequestEvent event) { ... // 清算完成后,收集已完成Order: if (!result.matchDetails.isEmpty()) { List<OrderEntity> closedOrders = new ArrayList<>(); if (result.takerOrder.status.isFinalStatus) { closedOrders.add(result.takerOrder); } for (MatchDetailRecord detail : result.matchDetails) { OrderEntity maker = detail.makerOrder(); if (maker.status.isFinalStatus) { closedOrders.add(maker); } } this.orderQueue.add(closedOrders); }}
// 启动一个线程将orderQueue的Order异步写入数据库:void saveOrders() { // TODO:}
类似的,输出OrderBook、通知用户成交等信息都是异步处理。
接下来,我们再继续完善processEvent()
,处理单个事件时,在处理具体的业务逻辑之前,我们首先根据sequenceId
判断是否是重复消息,是重复消息就丢弃:
void processEvent(AbstractEvent event) { if (event.sequenceId <= this.lastSequenceId) { logger.warn("skip duplicate event: {}", event); return; } // TODO:}
紧接着,我们判断是否丢失了消息,如果丢失了消息,就根据上次处理的消息的sequenceId
,从数据库里捞出后续消息,直到赶上当前消息的sequenceId
为止:
// 判断是否丢失了消息:if (event.previousId > this.lastSequenceId) { // 从数据库读取丢失的消息: List<AbstractEvent> events = storeService.loadEventsFromDb(this.lastSequenceId); if (events.isEmpty()) { // 读取失败: System.exit(1); return; } // 处理丢失的消息: for (AbstractEvent e : events) { this.processEvent(e); } return;}// 判断当前消息是否指向上一条消息:if (event.previousId != lastSequenceId) { System.exit(1); return;}// 正常处理:...// 更新lastSequenceId:this.lastSequenceId = event.sequenceId;
这样一来,我们对消息系统的依赖就不是要求它100%可靠,遇到重复消息、丢失消息,交易引擎都可以从这些错误中自己恢复。
由于资产、订单、撮合、清算都在内存中完成,如何保证交易引擎每处理一个事件,它的内部状态都是正确的呢?我们可以为交易引擎增加一个自验证功能,在debug模式下,每处理一个事件,就自动验证内部状态的完整性,包括:
- 验证资产系统总额为0,且除负债账户外其余账户资产不为负;
- 验证订单系统未成交订单所冻结的资产与资产系统中的冻结一致;
- 验证订单系统的订单与撮合引擎的订单簿一对一存在。
void processEvent(AbstractEvent event) { ... if (debugMode) { this.validate(); }}
这样我们就能快速在开发阶段尽可能早地发现问题。
交易引擎的测试也相对比较简单。对于同一组输入,每次运行都会得到相同的结果,所以我们可以构造几组确定的输入来验证交易引擎:
class TradingEngineServiceTest { @Test public void testTradingEngine() { // TODO: }}
下面是问题解答。
交易引擎崩溃后如何恢复?
交易引擎如果运行时崩溃,可以重启,重启后先把现有的所有交易事件重头开始执行一遍,即可得到最新的状态。
注意到重头开始执行交易事件,会导致重复发出市场成交、用户订单通知等事件,因此,可根据时间做判断,不再重复发通知。下游系统在处理通知事件时,也要根据通知携带的sequenceId
做去重判断。
有的童鞋会问,如果现有的交易事件已经有几千万甚至几十亿,从头开始执行如果需要花费几个小时甚至几天,怎么办?
可以定期把交易引擎的状态序列化至文件系统,例如,每10分钟一次。当交易引擎崩溃时,读取最新的状态文件,即可恢复至约10分钟前的状态,后续追赶只需要执行很少的事件消息。
如何序列化交易引擎的状态?
交易引擎的状态包括:
- 资产系统的状态:即所有用户的资产列表;
- 订单系统的状态:即所有活动订单列表;
- 撮合引擎的状态:即买卖盘和最新市场价;
- 最后一次处理的sequenceId。
序列化时,分别针对每个子系统进行序列化。对资产系统来说,每个用户的资产可序列化为用户ID: [USD可用, USD冻结, BTC可用, BTC冻结]
的JSON格式,整个资产系统序列化后结构如下:
{ "1": [-123000, 0, -12.3, 0], "100": [60000, 20000, 9, 0], "200": [43000, 0, 3, 0.3]}
订单系统可序列化为一系列活动订单列表:
[ { "id": 10012207, "sequenceId": 1001, "price": 20901, ...}, { "id": 10022207, "sequenceId": 1002, "price": 20902, ...},]
撮合引擎可序列化为买卖盘列表(仅包含订单ID):
{ "BUY": [10012207, 10022207, ...], "SELL": [...], "marketPrice": 20901}
最后合并为一个交易引擎的状态文件:
{ "sequenceId": 189000, "assets": { ... }, "orders": [ ... ], "match": { ... }}
交易引擎启动时,读取状态文件,然后依次恢复资产系统、订单系统和撮合引擎的状态,就得到了指定sequenceId
的状态。
写入状态时,如果是异步写入,需要先复制状态、再写入,防止多线程读同一实例导致状态不一致。读写JSON时,要使用JSON库的流式API(例如Jackson的Streaming API),以免内存溢出。对BigDecimal
进行序列化时,要注意不要误读为double
类型以免丢失精度。
参考源码
小结
交易引擎是以事件驱动的状态机模型,同样的输入将得到同样的输出。为提高交易系统的健壮性,可以自动检测重复消息和消息丢失并自动恢复。
24.4 设计定序系统
当系统通过API接收到所有交易员发送的订单请求后,就需要按接收顺序对订单请求进行定序。
定序的目的是在系统内部完成订单请求排序,排序的同时给每个订单请求一个全局唯一递增的序列号,然后将排序后的订单请求发送至交易引擎。
因此,定序系统的输入是上游发送的事件消息,输出是定序后的带Sequence ID的事件,这样,下游的交易引擎就可以由确定性的事件进行驱动。
除了对订单请求进行定序,定序系统还需要对撤消订单、转账请求进行定序,因此,输入的事件消息包括:
- OrderRequestEvent:订单请求;
- OrderCancelEvent:订单取消;
- TransferEvent:转账请求。
对于某些类型的事件,例如转账请求,它必须被处理一次且仅处理一次。而消息系统本质上也是一个分布式网络应用程序,它的内部也有缓存、重试等机制。一般来说,消息系统可以实现的消息传输模式有:
- 消息保证至少发送成功一次,也就是可能会重复发送(At least once);
- 消息只保证最多发送一次,也就是要么成功,要么失败(At most once);
- 消息保证发送成功且仅发送成功一次(Exactly once)。
实际上,第3种理想情况基本不存在,没有任何基于网络的消息系统能实现这种模式,所以,大部分消息系统都是按照第1种方式来设计,也就是基于确认+重试的机制保证消息可靠到达。
而定序系统要处理的事件消息,例如转账请求,如果消息重复了多次,就会造成重复转账,所以,我们还需要对某些事件消息作特殊处理,让发送消息的客户端给这个事件消息添加一个全局唯一ID,定序系统根据全局唯一ID去重,而不是依赖消息中间件的能力。
此外,为了让下游系统,也就是交易引擎能一个不漏地按顺序接收定序后的事件消息,我们也不能相信消息中间件总是在理想状态下工作。
除了给每个事件消息设置一个唯一递增ID外,定序系统还同时给每个事件消息附带前一事件的ID,这样就形成了一个微型“区块链”:
┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐│sid=1│ │sid=2│ │sid=3│ │sid=4││pid=0│──▶│pid=1│──▶│pid=2│──▶│pid=3││msg=A│ │msg=B│ │msg=C│ │msg=D│└─────┘ └─────┘ └─────┘ └─────┘
由于下游接收方可以根据Sequence ID去重,因此,重复发送的消息会被忽略:
┌─────┐┌─────┐┌─────┐┌ ─ ─ ┐┌ ─ ─ ┐┌─────┐│sid=1││sid=2││sid=3│ sid=2 sid=3 │sid=4││pid=0││pid=1││pid=2││pid=1││pid=2││pid=3││msg=A││msg=B││msg=C│ msg=B msg=C │msg=D│└─────┘└─────┘└─────┘└ ─ ─ ┘└ ─ ─ ┘└─────┘
如果出现消息丢失:
┌─────┐┌─────┐┌ ─ ─ ┐┌─────┐│sid=1││sid=2│ │sid=4││pid=0││pid=1││ ││pid=3││msg=A││msg=B│ │msg=D│└─────┘└─────┘└ ─ ─ ┘└─────┘
由于存在Previous ID,下游接收方可以检测到丢失,于是,接收方可以根据上次收到的ID去数据库查询,直到读取到最新的Sequence ID为止。只要定序系统先将定序后的事件消息落库,再发送给下游,就可以保证无论是消息重复还是丢失,接收方都可以正确处理:
┌─────────┐ ┌─────────┐ ┌─────────┐│Sequencer│──▶│ MQ │──▶│ Engine │└─────────┘ └─────────┘ └─────────┘ │ ┌─────────┐ │ └───────▶│ DB │◀───────┘ └─────────┘
整个过程中,丢失极少量消息不会对系统的可用性造成影响,这样就极大地减少了系统的运维成本和线上排错成本。
最后,无论是接收方还是发送方,为了提高消息收发的效率,应该总是使用批处理方式。定序系统采用批量读+批量batch写入数据库+批量发送消息的模式,可以显著提高TPS。
下面我们一步一步地实现定序系统。
首先定义要接收的事件消息,它包含一个Sequence ID、上一个Sequence ID以及一个可选的用于去重的全局唯一ID:
public class AbstractEvent extends AbstractMessage { // 定序后的Sequence ID: public long sequenceId;
// 定序后的Previous Sequence ID: public long previousId;
// 可选的全局唯一标识: @Nullable public String uniqueId;}
定序系统接收的事件仅包含可选的uniqueId
,忽略sequenceId
和previousId
。定序完成后,把sequenceId
和previousId
设置好,再发送给下游。
SequenceService
用于接收上游消息、定序、发送消息给下游:
@Componentpublic class SequenceService { @Autowired SequenceHandler sequenceHandler;
// 全局唯一递增ID: private AtomicLong sequence;
// 接收消息并定序再发送: synchronized void processMessages(List<AbstractEvent> messages) { // 定序后的事件消息: List<AbstractEvent> sequenced = null; try { // 定序: sequenced = this.sequenceHandler.sequenceMessages(this.messageTypes, this.sequence, messages); } catch (Throwable e) { // 定序出错时进程退出: logger.error("exception when do sequence", e); System.exit(1); throw new Error(e); } // 发送定序后的消息: sendMessages(sequenced); }}
SequenceHandler
是真正写入Sequence ID并落库的:
@Component@Transactional(rollbackFor = Throwable.class)public class SequenceHandler { public List<AbstractEvent> sequenceMessages(MessageTypes messageTypes, AtomicLong sequence, List<AbstractEvent> messages) throws Exception { // 利用UniqueEventEntity去重: List<UniqueEventEntity> uniques = null; Set<String> uniqueKeys = null; List<AbstractEvent> sequencedMessages = new ArrayList<>(messages.size()); List<EventEntity> events = new ArrayList<>(messages.size()); for (AbstractEvent message : messages) { UniqueEventEntity unique = null; final String uniqueId = message.uniqueId; // 在数据库中查找uniqueId检查是否已存在: if (uniqueId != null) { if ((uniqueKeys != null && uniqueKeys.contains(uniqueId)) || db.fetch(UniqueEventEntity.class, uniqueId) != null) { // 忽略已处理的重复消息: logger.warn("ignore processed unique message: {}", message); continue; } unique = new UniqueEventEntity(); unique.uniqueId = uniqueId; if (uniques == null) { uniques = new ArrayList<>(); } uniques.add(unique); if (uniqueKeys == null) { uniqueKeys = new HashSet<>(); } uniqueKeys.add(uniqueId); } // 上次定序ID: long previousId = sequence.get(); // 本次定序ID: long currentId = sequence.incrementAndGet(); // 先设置message的sequenceId / previouseId,再序列化并落库: message.sequenceId = currentId; message.previousId = previousId; // 如果此消息关联了UniqueEvent,给UniqueEvent加上相同的sequenceId: if (unique != null) { unique.sequenceId = message.sequenceId; } // 准备写入数据库的Event: EventEntity event = new EventEntity(); event.previousId = previousId; event.sequenceId = currentId; event.data = messageTypes.serialize(message); events.add(event); // 添加到结果集: sequencedMessages.add(message); } // 落库: if (uniques != null) { db.insert(uniques); } db.insert(events); // 返回定序后的消息: return sequencedMessages; }}
在SequenceService
中调用SequenceHandler
是因为我们写入数据库时需要利用Spring提供的声明式数据库事务,而消息的接收和发送并不需要被包含在数据库事务中。
最后,我们来考虑其他一些细节问题。
如何在定序器重启后正确初始化下一个序列号?
正确初始化下一个序列号实际上就是要把一个正确的初始值给AtomicLong sequence
字段。可以读取数据库获得当前最大的Sequence ID,这个Sequence ID就是上次最后一次定序的ID。
如何在定序器崩溃后自动恢复?
由于任何一个时候都只能有一个定序器工作,这样才能保证Sequence ID的正确性,因此,无法让两个定序器同时工作。
虽然无法让两个定序器同时工作,但可以让两个定序器以主备模式同时运行,仅主定序器工作。当主定序器崩溃后,备用定序器自动切换为主定序器接管后续工作即可。
为了实现主备模式,可以启动两个定序器,然后抢锁的形式确定主备。抢到锁的定序器开始工作,并定期刷新锁,未抢到锁的定序器定期检查锁。可以用数据库锁实现主备模式。
如何解决定序的性能瓶颈?
通常来说,消息系统的吞吐量远超数据库。定序的性能取决于批量写入数据库的能力。首先要提高数据库的性能,其次考虑按Sequence ID进行分库,但分库会提高定序的复杂度,也会使下游从数据库读取消息时复杂度增加。最后,可以考虑使用专门针对时序优化的数据库,但这样就不如MySQL这种数据库通用、易用。
参考源码
小结
定序系统负责给每个事件一个唯一递增序列号。通过引用前一个事件的序列号,可以构造一个能自动检测连续性的事件流。
24.5 设计API系统
有了交易引擎和定序系统,我们还需要一个API系统,用于接收所有交易员的订单请求。
相比事件驱动的交易引擎,API系统就比较简单,因为它就是一个标准的Web应用。
在编写API之前,我们需要对请求进行认证,即识别出是哪个用户发出的请求。用户认证放在Filter中是最合适的。认证方式可以是简单粗暴的用户名+口令,也可以是Token,也可以是API Key+API Secret等模式。
我们先实现一个最简单的用户名+口令的认证方式。需要注意的是,API和Web页面不同,Web页面可以给用户一个登录页,登录成功后设置Session或Cookie,后续请求检查的是Session或Cookie。API不能使用Session,因为Session很难做无状态集群,API也不建议使用Cookie,因为API域名很可能与Web UI的域名不一致,拿不到Cookie。要在API中使用用户名+口令的认证方式,可以用标准的HTTP头Authorization的Basic
模式:
Authorization: Basic 用户名:口令
因此,我们可以尝试从Authorization
中获取用户名和口令来认证:
Long parseUserFromAuthorization(String auth) { if (auth.startsWith("Basic ")) { // 用Base64解码: String eap = new String(Base64.getDecoder().decode(auth.substring(6))); // 分离email:password int pos = eap.indexOf(':'); String email = eap.substring(0, pos); String passwd = eap.substring(pos + 1); // 验证: UserProfileEntity p = userService.signin(email, passwd); return p.userId; } throw new ApiException(ApiError.AUTH_SIGNIN_FAILED, "Invalid Authorization header.");}
在ApiFilter
中完成认证后,使用UserContext
传递用户ID:
public class ApiFilter { @Override public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException { // 尝试认证用户: String authHeader = req.getHeader("Authorization"); Long userId = authHeader == null ? null : parseUserFromAuthorization(authHeader); if (userId == null) { // 匿名身份: chain.doFilter(req, resp); } else { // 用户身份: try (UserContext ctx = new UserContext(userId)) { chain.doFilter(req, resp); } } }}
Basic模式很简单,需要注意的是用户名:口令
使用:
分隔,然后整个串用Base64编码,因此,读取的时候需要先用Base64解码。
虽然Basic模式并不安全,但是有了一种基本的认证模式,我们就可以把API-定序-交易串起来了。后续我们再继续添加其他认证模式。
编写API Controller
对于认证用户的操作,例如,查询资产余额,可通过UserContext
获取当前用户,然后通过交易引擎查询并返回用户资产余额:
@ResponseBody@GetMapping(value = "/assets", produces = "application/json")public String getAssets() throws IOException { Long userId = UserContext.getRequiredUserId(); return tradingEngineApiProxyService.get("/internal/" + userId + "/assets");}
因为交易引擎返回的结果就是JSON字符串,没必要先反序列化再序列化,可以以String
的方式直接返回给客户端,需要标注@ResponseBody
表示不要对String
再进行序列化处理。
对于无需认证的操作,例如,查询公开市场的订单簿,可以直接返回Redis缓存结果:
@ResponseBody@GetMapping(value = "/orderBook", produces = "application/json")public String getOrderBook() { String data = redisService.get(RedisCache.Key.ORDER_BOOK); return data == null ? OrderBookBean.EMPTY : data;}
但是对于创建订单的请求,处理就麻烦一些,因为API收到请求后,仅仅通过消息系统给定序系统发了一条消息。消息系统本身并不是类似HTTP的请求-响应模式,我们拿不到消息处理的结果。这里先借助Spring的异步响应模型DeferredResult
,再借助Redis的pub/sub模型,当API发送消息时,使用全局唯一refId
跟踪消息,当交易引擎处理完订单请求后,向Redis发送pub事件,API收到Redis推送的事件后,根据refId
找到DeferredResult
,设置结果后由Spring异步返回给客户端:
┌─────────┐ ┌─────────┐──▶│ API │◀────────────────│ Redis │ └─────────┘ └─────────┘ │ ▲ ▼ │ ┌─────────┐ │ │ MQ │ pub│ └─────────┘ │ │ │ ▼ │ ┌─────────┐ ┌─────────┐ ┌─────────┐ │Sequencer│──▶│ MQ │──▶│ Engine │ └─────────┘ └─────────┘ └─────────┘
代码实现如下:
public class TradingApiController { // 消息refId -> DeferredResult: Map<String, DeferredResult<ResponseEntity<String>>> deferredResultMap = new ConcurrentHashMap<>();
@Autowired RedisService redisService;
@PostConstruct public void init() { // 订阅Redis: this.redisService.subscribe(RedisCache.Topic.TRADING_API_RESULT, this::onApiResultMessage); }
@PostMapping(value = "/orders", produces = "application/json") @ResponseBody public DeferredResult<ResponseEntity<String>> createOrder(@RequestBody OrderRequestBean orderRequest) { final Long userId = UserContext.getRequiredUserId(); // 消息的Reference ID: final String refId = IdUtil.generateUniqueId(); var event = new OrderRequestEvent(); event.refId = refId; event.userId = userId; event.direction = orderRequest.direction; event.price = orderRequest.price; event.quantity = orderRequest.quantity; event.createdAt = System.currentTimeMillis(); // 如果超时则返回: ResponseEntity<String> timeout = new ResponseEntity<>(getTimeoutJson(), HttpStatus.BAD_REQUEST); // 正常异步返回: DeferredResult<ResponseEntity<String>> deferred = new DeferredResult<>(500, timeout); // 0.5秒超时 deferred.onTimeout(() -> { this.deferredResultMap.remove(event.refId); }); // 根据refId跟踪消息处理结果: this.deferredResultMap.put(event.refId, deferred); // 发送消息: sendMessage(event); return deferred; }
// 收到Redis的消息结果推送: public void onApiResultMessage(String msg) { ApiResultMessage message = objectMapper.readValue(msg, ApiResultMessage.class); if (message.refId != null) { // 根据消息refId查找DeferredResult: DeferredResult<ResponseEntity<String>> deferred = this.deferredResultMap.remove(message.refId); if (deferred != null) { // 找到DeferredResult后设置响应结果: ResponseEntity<String> resp = new ResponseEntity<>(JsonUtil.writeJson(message.result), HttpStatus.OK); deferred.setResult(resp); } } }}
如何实现API Key认证
身份认证的本质是确认用户身份。用户身份其实并不包含密码,而是用户ID、email、名字等信息,可以看作数据库中的user_profiles
表:
userId | name | |
---|---|---|
100 | bob@example.com | Bob |
101 | alice@example.com | alice |
102 | cook@example.com | Cook |
使用口令认证时,通过添加一个password_auths
表,存储哈希后的口令,并关联至某个用户ID,即可完成口令认证:
userId | random | passwd |
---|---|---|
100 | c47snXI | 7b6da12c… |
101 | djEqC2I | f7b68248… |
并不是每个用户都必须有口令,没有口令的用户仅仅表示该用户不能通过口令来认证身份,但完全可以通过其他方式认证。
使用API Key认证同理,通过添加一个api_auths
表,存储API Key、API Secret并关联至某个用户ID:
userId | apiKey | apiSecret |
---|---|---|
101 | 5b503947f4f5d34a | e57c677d4ab4c5a4 |
102 | 13a867e8da13c7f6 | 92e41573e833ae13 |
102 | 341a8e60baf5b824 | 302c9e195826267f |
用户使用API Key认证时,提供API Key,以及用API Secret计算的Hmac哈希,服务器验证Hmac哈希后,就可以确认用户身份,因为其他人不知道该用户的API Secret,无法计算出正确的Hmac。
发送API Key认证时,可以定义如下的HTTP头:
API-Key: 5b503947f4f5d34aAPI-Timestamp: 20220726T092137Z <- 防止重放攻击的时间戳API-Signature: d7a567b6cab85bcd
计算签名的原始输入可以包括HTTP Method、Path、Timestamp、Body等关键信息,具体格式可参考AWS API签名方式。
一个用户可以关联多个API Key认证,还可以给每个API Key附加特定权限,例如只读权限,这样用API Key认证就更加安全。
内部系统调用API如何实现用户认证
很多时候,内部系统也需要调用API,并且需要以特定用户的身份调用API。让内部系统去读用户的口令或者API Key都是不合理的,更好的方式是使用一次性Token,还是利用Authorization头的Bearer模式:
Authorization: Bearer 5NPtI6LW...
构造一次性Token可以用userId:expires:hmac
,内部系统和API共享同一个Hmac Key,就可以正确计算并验证签名。外部用户因为无法获得Hmac Key而无法伪造Token。
如何跟踪API性能
可以使用Spring提供的HandlerInterceptor
和DeferredResultProcessingInterceptor
跟踪API性能,它们分别用于拦截同步API和异步API。
参考源码
小结
API系统负责认证用户身份,并提供一个唯一的交易入口。
24.6 设计行情系统
行情系统用来生成公开市场的历史数据,主要是K线图。
K线图的数据来源是交易引擎成交产生的一个个Tick。一个K线包括OHLC这4个价格数据。在一个时间段内,第一个Tick的价格是Open,最后一个Tick的价格是Close,最高的价格是High,最低的价格是Low:
High ──▶│ │ ┌─┴─┐◀── Close │ │ │ │Open ──▶└─┬─┘ │ │ Low ──▶│
给定一组Tick集合,就可以汇总成一个K线,对应一个Bar结构:
public class AbstractBarEntity { public long startTime; // 开始时间 public BigDecimal openPrice; // 开始价格 public BigDecimal highPrice; // 最高价格 public BigDecimal lowPrice; // 最低价格 public BigDecimal closePrice; // 结束价格 public BigDecimal quantity; // 成交数量}
通常我们需要按1秒、1分钟、1小时和1天来生成不同类型的K线,因此,行情系统的功能就是不断从消息系统中读取Tick,合并,然后输出不同类型的K线。
此外,API系统还需要提供查询公开市场信息的功能。对于最近的成交信息和K线图,可以缓存在Redis中,对于较早时期的K线图,可以通过数据库查询。因此,行情系统需要将生成的K线保存到数据库中,同时负责不断更新Redis的缓存。
对于最新成交信息,我们在Redis中用一个List表示,它的每一个元素是一个序列号后的JSON:
["{...}", "{...}", "{...}"...]
如果有新的Tick产生,就需要把它们追加到列表尾部,同时将最早的Tick删除,以便维护一个最近成交的列表。
直接读取Redis列表,操作后再写回Redis是可以的,但比较麻烦。这里我们直接用Lua脚本更新最新Tick列表。Redis支持将一个Lua脚本加载后,直接在Redis内部执行脚本:
local KEY_LAST_SEQ = '_TickSeq_' -- 上次更新的SequenceIDlocal LIST_RECENT_TICKS = KEYS[1] -- 最新Ticks的Key
local seqId = ARGV[1] -- 输入的SequenceIDlocal jsonData = ARGV[2] -- 输入的JSON字符串表示的tick数组:"["{...}","{...}",...]"local strData = ARGV[3] -- 输入的JSON字符串表示的tick数组:"[{...},{...},...]"
-- 获取上次更新的sequenceId:local lastSeqId = redis.call('GET', KEY_LAST_SEQ)local ticks, len;
if not lastSeqId or tonumber(seqId) > tonumber(lastSeqId) then -- 广播: redis.call('PUBLISH', 'notification', '{"type":"tick","sequenceId":' .. seqId .. ',"data":' .. jsonData .. '}') -- 保存当前sequence id: redis.call('SET', KEY_LAST_SEQ, seqId) -- 更新最新tick列表: ticks = cjson.decode(strData) len = redis.call('RPUSH', LIST_RECENT_TICKS, unpack(ticks)) if len > 100 then -- 裁剪LIST以保存最新的100个Tick: redis.call('LTRIM', LIST_RECENT_TICKS, len-100, len-1) end return trueend-- 无更新返回falsereturn false
在API中,要获取最新成交信息,我们直接从Redis缓存取出列表,然后拼接成一个JSON字符串:
@ResponseBody@GetMapping(value = "/ticks", produces = "application/json")public String getRecentTicks() { List<String> data = redisService.lrange(RedisCache.Key.RECENT_TICKS, 0, -1); if (data == null || data.isEmpty()) { return "[]"; } StringJoiner sj = new StringJoiner(",", "[", "]"); for (String t : data) { sj.add(t); } return sj.toString();}
用Lua脚本更新Redis缓存还有一个好处,就是Lua脚本执行的时候,不但可以更新List,还可以通过Publish命令广播事件,后续我们编写基于WebSocket的推送服务器时,直接监听Redis广播,就可以主动向浏览器推送Tick更新的事件。
类似的,针对每一种K线,我们都在Redis中用ZScoredSet存储,用K线的开始时间戳作为Score。更新K线时,从每种ZScoredSet中找出Score最大的Bar结构,就是最后一个Bar,然后尝试更新。如果可以持久化这个Bar就返回,如果可以合并这个Bar就刷新ZScoreSet,用Lua脚本实现如下:
local function merge(existBar, newBar) existBar[3] = math.max(existBar[3], newBar[3]) -- 更新High Price existBar[4] = math.min(existBar[4], newBar[4]) -- 更新Low Price existBar[5] = newBar[5] -- close existBar[6] = existBar[6] + newBar[6] -- 更新quantityend
local function tryMergeLast(barType, seqId, zsetBars, timestamp, newBar) local topic = 'notification' local popedScore, popedBar -- 查找最后一个Bar: local poped = redis.call('ZPOPMAX', zsetBars) if #poped == 0 then -- ZScoredSet无任何bar, 直接添加: redis.call('ZADD', zsetBars, timestamp, cjson.encode(newBar)) redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}') else popedBar = cjson.decode(poped[1]) popedScore = tonumber(poped[2]) if popedScore == timestamp then -- 合并Bar并发送通知: merge(popedBar, newBar) redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar)) redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(popedBar) .. '}') else -- 可持久化最后一个Bar,生成新的Bar: if popedScore < timestamp then redis.call('ZADD', zsetBars, popedScore, cjson.encode(popedBar), timestamp, cjson.encode(newBar)) redis.call('PUBLISH', topic, '{"type":"bar","resolution":"' .. barType .. '","sequenceId":' .. seqId .. ',"data":' .. cjson.encode(newBar) .. '}') return popedBar end end end return nilend
local seqId = ARGV[1]local KEY_BAR_SEQ = '_BarSeq_'
local zsetBars, topics, barTypeStartTimeslocal openPrice, highPrice, lowPrice, closePrice, quantitylocal persistBars = {}
-- 检查sequence:local seq = redis.call('GET', KEY_BAR_SEQ)if not seq or tonumber(seqId) > tonumber(seq) then zsetBars = { KEYS[1], KEYS[2], KEYS[3], KEYS[4] } barTypeStartTimes = { tonumber(ARGV[2]), tonumber(ARGV[3]), tonumber(ARGV[4]), tonumber(ARGV[5]) } openPrice = tonumber(ARGV[6]) highPrice = tonumber(ARGV[7]) lowPrice = tonumber(ARGV[8]) closePrice = tonumber(ARGV[9]) quantity = tonumber(ARGV[10])
local i, bar local names = { 'SEC', 'MIN', 'HOUR', 'DAY' } -- 检查是否可以merge: for i = 1, 4 do bar = tryMergeLast(names[i], seqId, zsetBars[i], barTypeStartTimes[i], { barTypeStartTimes[i], openPrice, highPrice, lowPrice, closePrice, quantity }) if bar then persistBars[names[i]] = bar end end redis.call('SET', KEY_BAR_SEQ, seqId) return cjson.encode(persistBars)end
redis.log(redis.LOG_WARNING, 'sequence ignored: exist seq => ' .. seq .. ' >= ' .. seqId .. ' <= new seq')
return '{}'
接下来我们编写QuotationService
,初始化的时候加载Redis脚本,接收到Tick消息时调用脚本更新Tick和Bar,然后持久化Tick和Bar,代码如下:
@Componentpublic class QuotationService {
@Autowired RedisService redisService;
@Autowired MessagingFactory messagingFactory;
MessageConsumer tickConsumer;
private String shaUpdateRecentTicksLua = null; private String shaUpdateBarLua = null;
@PostConstruct public void init() throws Exception { // 加载Redis脚本: this.shaUpdateRecentTicksLua = this.redisService.loadScriptFromClassPath("/redis/update-recent-ticks.lua"); this.shaUpdateBarLua = this.redisService.loadScriptFromClassPath("/redis/update-bar.lua"); // 接收Tick消息: String groupId = Messaging.Topic.TICK.name() + "_" + IpUtil.getHostId(); this.tickConsumer = messagingFactory.createBatchMessageListener(Messaging.Topic.TICK, groupId, this::processMessages); }
// 处理接收的消息: public void processMessages(List<AbstractMessage> messages) { for (AbstractMessage message : messages) { processMessage((TickMessage) message); } }
// 处理一个Tick消息: void processMessage(TickMessage message) { // 对一个Tick消息中的多个Tick先进行合并: final long createdAt = message.createdAt; StringJoiner ticksStrJoiner = new StringJoiner(",", "[", "]"); StringJoiner ticksJoiner = new StringJoiner(",", "[", "]"); BigDecimal openPrice = BigDecimal.ZERO; BigDecimal closePrice = BigDecimal.ZERO; BigDecimal highPrice = BigDecimal.ZERO; BigDecimal lowPrice = BigDecimal.ZERO; BigDecimal quantity = BigDecimal.ZERO; for (TickEntity tick : message.ticks) { String json = tick.toJson(); ticksStrJoiner.add("\"" + json + "\""); ticksJoiner.add(json); if (openPrice.signum() == 0) { openPrice = tick.price; closePrice = tick.price; highPrice = tick.price; lowPrice = tick.price; } else { // open price is set: closePrice = tick.price; highPrice = highPrice.max(tick.price); lowPrice = lowPrice.min(tick.price); } quantity = quantity.add(tick.quantity); } // 计算应该合并的每种类型的Bar的开始时间: long sec = createdAt / 1000; long min = sec / 60; long hour = min / 60; long secStartTime = sec * 1000; long minStartTime = min * 60 * 1000; long hourStartTime = hour * 3600 * 1000; long dayStartTime = Instant.ofEpochMilli(hourStartTime).atZone(zoneId).withHour(0).toEpochSecond() * 1000;
// 更新Tick缓存: String ticksData = ticksJoiner.toString(); Boolean tickOk = redisService.executeScriptReturnBoolean(this.shaUpdateRecentTicksLua, new String[] { RedisCache.Key.RECENT_TICKS }, new String[] { String.valueOf(this.sequenceId), ticksData, ticksStrJoiner.toString() }); if (!tickOk.booleanValue()) { logger.warn("ticks are ignored by Redis."); return; } // 保存Tick至数据库: saveTicks(message.ticks);
// 更新Redis缓存的各种类型的Bar: String strCreatedBars = redisService.executeScriptReturnString(this.shaUpdateBarLua, new String[] { RedisCache.Key.SEC_BARS, RedisCache.Key.MIN_BARS, RedisCache.Key.HOUR_BARS, RedisCache.Key.DAY_BARS }, new String[] { // ARGV String.valueOf(this.sequenceId), // sequence id String.valueOf(secStartTime), // sec-start-time String.valueOf(minStartTime), // min-start-time String.valueOf(hourStartTime), // hour-start-time String.valueOf(dayStartTime), // day-start-time String.valueOf(openPrice), // open String.valueOf(highPrice), // high String.valueOf(lowPrice), // low String.valueOf(closePrice), // close String.valueOf(quantity) // quantity }); Map<BarType, BigDecimal[]> barMap = JsonUtil.readJson(strCreatedBars, TYPE_BARS); if (!barMap.isEmpty()) { // 保存Bar: SecBarEntity secBar = createBar(SecBarEntity::new, barMap.get(BarType.SEC)); MinBarEntity minBar = createBar(MinBarEntity::new, barMap.get(BarType.MIN)); HourBarEntity hourBar = createBar(HourBarEntity::new, barMap.get(BarType.HOUR)); DayBarEntity dayBar = createBar(DayBarEntity::new, barMap.get(BarType.DAY)); saveBars(secBar, minBar, hourBar, dayBar); } }}
K线是一组Bar按ZSet缓存在Redis中,Score就是Bar的开始时间。更新Bar时,同时广播通知,以便后续推送。要查询某种K线图,在API中,需要传入开始和结束的时间戳,通过ZRANGE命令返回排序后的List:
String getBars(String key, long start, long end) { List<String> data = redisService.zrangebyscore(key, start, end); if (data == null || data.isEmpty()) { return "[]"; } StringJoiner sj = new StringJoiner(",", "[", "]"); for (String t : data) { sj.add(t); } return sj.toString();}
参考源码
小结
行情系统是典型的少量写、大量读的模式,非常适合缓存。通过编写Lua脚本可使得更新Redis更加简单。
24.7 设计推送系统
推送系统负责将公开市场的实时信息,包括订单簿、最新成交、最新K线等推送给客户端,对于用户的订单,还需要将成交信息推送给指定用户。FIX(Financial Information eXchange)协议是金融交易的一种实时化通讯协议,但是它非常复杂,而且不同版本的规范也不同。对于Warp Exchange来说,我们先实现一版简单的基于WebSocket推送JSON格式的通知。
和普通Web应用不同的是,基于Servlet的线程池模型不能高效地支持成百上千的WebSocket长连接。Java提供了NIO能充分利用Linux系统的epoll机制高效支持大量的长连接,但是直接使用NIO的接口非常繁琐,通常我们会选择基于NIO的Netty服务器。直接使用Netty其实仍然比较繁琐,基于Netty开发我们可以选择:
- Spring WebFlux:封装了Netty并实现Reactive接口;
- Vert.x:封装了Netty并提供简单的API接口。
这里我们选择Vert.x,因为它的API更简单。
Vert.x本身包含若干模块,根据需要,我们引入3个组件:
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>${vertx.version}</version></dependency>
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web</artifactId> <version>${vertx.version}</version></dependency>
<dependency> <groupId>io.vertx</groupId> <artifactId>vertx-redis-client</artifactId> <version>${vertx.version}</version></dependency>
我们先编写推送服务的入口:
package com.itranswarp.exchange.push;
@SpringBootApplication// 禁用数据库自动配置 (无DataSource, JdbcTemplate...)@EnableAutoConfiguration(exclude = DataSourceAutoConfiguration.class)public class PushApplication { public static void main(String[] args) { System.setProperty("vertx.disableFileCPResolving", "true"); System.setProperty("vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory"); SpringApplication app = new SpringApplication(PushApplication.class); // 禁用Spring的Web: app.setWebApplicationType(WebApplicationType.NONE); app.run(args); }}
上述代码仍然是一个标准的Spring Boot应用,因为我们希望利用Spring Cloud Config读取配置。由于我们不使用Spring自身的Web功能,因此需要禁用Spring的Web功能。推送服务本身并不需要访问数据库,因此禁用数据库自动配置。最后,我们把PushApplication
放在com.itranswarp.exchange.push
包下面,以避免自动扫描到com.itranswarp.exchange
包下的组件(如RedisService)。
下一步是编写PushService
,注意它是一个Spring组件,由Spring初始化:
@Componentpublic class PushService extends LoggerSupport { @Value("${server.port}") private int serverPort;
@Value("${exchange.config.hmac-key}") String hmacKey;
@Value("${spring.redis.standalone.host:localhost}") private String redisHost;
@Value("${spring.redis.standalone.port:6379}") private int redisPort;
@Value("${spring.redis.standalone.password:}") private String redisPassword;
@Value("${spring.redis.standalone.database:0}") private int redisDatabase = 0;
private Vertx vertx;
@PostConstruct public void startVertx() { // TODO: init Vert.x }}
由Spring初始化该组件的目的是注入各种配置。在初始化方法中,我们就可以启动Vert.x:
@PostConstructpublic void startVertx() { // 启动Vert.x: this.vertx = Vertx.vertx();
// 创建一个Vert.x Verticle组件: var push = new PushVerticle(this.hmacKey, this.serverPort); vertx.deployVerticle(push);
// 连接到Redis: String url = "redis://" + (this.redisPassword.isEmpty() ? "" : ":" + this.redisPassword + "@") + this.redisHost + ":" + this.redisPort + "/" + this.redisDatabase; Redis redis = Redis.createClient(vertx, url);
redis.connect().onSuccess(conn -> { // 事件处理: conn.handler(response -> { // 收到Redis的PUSH: if (response.type() == ResponseType.PUSH) { int size = response.size(); if (size == 3) { Response type = response.get(2); if (type instanceof BulkType) { // 收到PUBLISH通知: String msg = type.toString(); // 由push verticle组件处理该通知: push.broadcast(msg); } } } }); // 订阅Redis的Topic: conn.send(Request.cmd(Command.SUBSCRIBE).arg(RedisCache.Topic.NOTIFICATION)).onSuccess(resp -> { logger.info("subscribe ok."); }).onFailure(err -> { logger.error("subscribe failed.", err); System.exit(1); }); }).onFailure(err -> { logger.error("connect to redis failed.", err); System.exit(1); });}
Vert.x用Verticle
表示一个组件,我们编写PushVerticle
来处理WebSocket连接:
public class PushVerticle extends AbstractVerticle { @Override public void start() { // 创建VertX HttpServer: HttpServer server = vertx.createHttpServer();
// 创建路由: Router router = Router.router(vertx);
// 处理请求 GET /notification: router.get("/notification").handler(requestHandler -> { HttpServerRequest request = requestHandler.request(); // 从token参数解析userId: Supplier<Long> supplier = () -> { String tokenStr = request.getParam("token"); if (tokenStr != null && !tokenStr.isEmpty()) { AuthToken token = AuthToken.fromSecureString(tokenStr, this.hmacKey); if (!token.isExpired()) { return token.userId(); } } return null; }; final Long userId = supplier.get(); logger.info("parse user id from token: {}", userId); // 将连接升级到WebSocket: request.toWebSocket(ar -> { if (ar.succeeded()) { initWebSocket(ar.result(), userId); } }); });
// 处理请求 GET /actuator/health: router.get("/actuator/health").respond( ctx -> ctx.response().putHeader("Content-Type", "application/json").end("{\"status\":\"UP\"}"));
// 其他请求返回404错误: router.get().respond(ctx -> ctx.response().setStatusCode(404).setStatusMessage("No Route Found").end());
// 绑定路由并监听端口: server.requestHandler(router).listen(this.serverPort, result -> { if (result.succeeded()) { logger.info("Vertx started on port(s): {} (http) with context path ''", this.serverPort); } else { logger.error("Start http server failed on port " + this.serverPort, result.cause()); vertx.close(); System.exit(1); } }); }}
在PushVerticle
中,start()
方法由Vert.x回调。我们在start()
方法中主要干这么几件事:
- 创建基于Vert.x的HTTP服务器(内部使用Netty);
- 创建路由;
- 绑定一个路径为
/notification
的GET请求,将其升级为WebSocket连接; - 绑定其他路径的GET请求;
- 开始监听指定端口号。
在处理/notification
时,我们尝试从URL的token参数解析出用户ID,这样我们就无需访问数据库而获得了当前连接的用户。升级到WebSocket连接后,再调用initWebSocket()
继续处理WebSocket连接:
public class PushVerticle extends AbstractVerticle { // 所有Handler: Map<String, Boolean> handlersSet = new ConcurrentHashMap<>(1000);
// 用户ID -> Handlers Map<Long, Set<String>> userToHandlersMap = new ConcurrentHashMap<>(1000);
// Handler -> 用户ID Map<String, Long> handlerToUserMap = new ConcurrentHashMap<>(1000);
void initWebSocket(ServerWebSocket websocket, Long userId) { // 获取一个WebSocket关联的Handler ID: String handlerId = websocket.textHandlerID(); // 处理输入消息: websocket.textMessageHandler(str -> { logger.info("text message: " + str); }); websocket.exceptionHandler(t -> { logger.error("websocket error: " + t.getMessage(), t); }); // 关闭连接时: websocket.closeHandler(e -> { unsubscribeClient(handlerId); unsubscribeUser(handlerId, userId); }); subscribeClient(handlerId); subscribeUser(handlerId, userId); }
void subscribeClient(String handlerId) { this.handlersSet.put(handlerId, Boolean.TRUE); }
void unsubscribeClient(String handlerId) { this.handlersSet.remove(handlerId); }
void subscribeUser(String handlerId, Long userId) { if (userId == null) { return; } handlerToUserMap.put(handlerId, userId); Set<String> set = userToHandlersMap.get(userId); if (set == null) { set = new HashSet<>(); userToHandlersMap.put(userId, set); } set.add(handlerId); }
void unsubscribeUser(String handlerId, Long userId) { if (userId == null) { return; } handlerToUserMap.remove(handlerId); Set<String> set = userToHandlersMap.get(userId); if (set != null) { set.remove(handlerId); } }}
在Vert.x中,每个WebSocket连接都有一个唯一的Handler标识,以String
表示。我们用几个Map
保存Handler和用户ID的映射关系,当关闭连接时,将对应的映射关系删除。
最后一个关键方法broadcast()
由PushService
中订阅的Redis推送时触发,该方法用于向用户主动推送通知:
public void broadcast(String text) { NotificationMessage message = JsonUtil.readJson(text, NotificationMessage.class); if (message.userId == null) { // 没有用户ID时,推送给所有连接: EventBus eb = vertx.eventBus(); for (String handler : this.handlersSet.keySet()) { eb.send(handler, text); } } else { // 推送给指定用户: Set<String> handlers = this.userToHandlersMap.get(message.userId); if (handlers != null) { EventBus eb = vertx.eventBus(); for (String handler : handlers) { eb.send(handler, text); } } }}
当Redis收到PUBLISH
调用后,它自动将String
表示的JSON数据推送给所有订阅端。我们在PushService
中订阅了notification
这个Topic,然后通过broadcast()
推送给WebSocket客户端。对于一个NotificationMessage
,如果设置了userId
,则推送给指定用户,适用于订单成交等针对用户ID的通知;如果没有设置userId
,则推送给所有用户,适用于公开市场信息的推送。
整个推送服务仅包括3个Java文件,我们就实现了基于Redis和WebSocket的高性能推送。
参考源码
小结
要高效处理大量WebSocket连接,我们选择基于Netty的Vert.x框架,可以通过少量代码配合Redis实现推送。
24.8 编写UI
我们已经实现了API系统、交易系统、定序系统、行情系统和推送系统,最后就差一个UI系统,让用户可以登录并通过浏览器下订单。UI系统就是一个标准的Web系统,相对比较简单。
UI系统本质上是一个MVC模型的Web系统,我们先引入一个视图的第三方依赖:
<dependency> <groupId>io.pebbletemplates</groupId> <artifactId>pebble-spring-boot-starter</artifactId> <version>${pebble.version}</version></dependency>
在ui.yml
加入最基本的配置:
pebble: prefix: /templates/ suffix: .html
注意到视图页面都放在src/main/resources/templates/
目录下。编写MvcController
,实现登录功能:
@Controllerpublic class MvcController extends LoggerSupport { // 显示登录页 @GetMapping("/signin") public ModelAndView signin(HttpServletRequest request) { if (UserContext.getUserId() != null) { return redirect("/"); } return prepareModelAndView("signin"); }
// 登录 @PostMapping("/signin") public ModelAndView signIn(@RequestParam("email") String email, @RequestParam("password") String password, HttpServletRequest request, HttpServletResponse response) { try { UserProfileEntity userProfile = userService.signin(email, password); // 登录成功后设置Cookie: AuthToken token = new AuthToken(userProfile.userId, System.currentTimeMillis() + 1000 * cookieService.getExpiresInSeconds()); cookieService.setSessionCookie(request, response, token); } catch (ApiException e) { // 登录失败: return prepareModelAndView("signin", Map.of("email", email, "error", "Invalid email or password.")); } catch (Exception e) { // 登录失败: return prepareModelAndView("signin", Map.of("email", email, "error", "Internal server error.")); } // 登录成功跳转: return redirect("/"); }}
登录成功后,设置一个Cookie代表用户身份,以userId:expiresAt:hash
表示。由于计算哈希引入了HmacKey
,因此,客户端无法伪造Cookie。
继续编写UIFilter
,用于验证Cookie并把特定用户的身份绑定到UserContext
中:
public class UIFilter { @Override public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException, ServletException { // 查找Cookie: AuthToken auth = cookieService.findSessionCookie(req); Long userId = auth == null ? null : auth.userId(); try (UserContext ctx = new UserContext(userId)) { chain.doFilter(request, response); } }}
我们再编写一个ProxyFilter
,它的目的是将页面JavaScript对API的调用转发给API系统:
public class ProxyFilter { @Override public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { Long userId = UserContext.getUserId(); // 构造一次性Token: String authToken = null; if (userId != null) { AuthToken token = new AuthToken(userId, System.currentTimeMillis() + 60_000); authToken = "Bearer " + token.toSecureString(hmacKey); } // 转发到API并读取响应: String responseJson = null; try { if ("GET".equals(request.getMethod())) { Map<String, String[]> params = request.getParameterMap(); Map<String, String> query = params.isEmpty() ? null : convertParams(params); responseJson = tradingApiClient.get(String.class, request.getRequestURI(), authToken, query); } else if ("POST".equals(request.getMethod())) { responseJson = tradingApiClient.post(String.class, request.getRequestURI(), authToken, readBody(request)); } // 写入响应: response.setContentType("application/json;charset=utf-8"); PrintWriter pw = response.getWriter(); pw.write(responseJson); pw.flush(); } catch (ApiException e) { // 写入错误响应: writeApiException(request, response, e); } catch (Exception e) { // 写入错误响应: writeApiException(request, response, new ApiException(ApiError.INTERNAL_SERVER_ERROR, null, e.getMessage())); } }}
把ProxyFilter
挂载到/api/*
,通过UI转发请求的目的是简化页面JavaScript调用API,一是不再需要跨域,二是UI已经经过了登录认证,转发过程中自动生成一次性Token来调用API,这样JavaScript不再关心如何生成Authorization
头。
下面我们就可以开始编写页面了:
- signin.html:登录页;
- signup.html:注册页;
- index.html:交易页。
页面功能主要由JavaScript实现,我们选择Vue前端框架,最终实现效果如下:
最后,在后台注册时,如果检测到本地开发环境,就自动调用内部API给用户添加一些资产,否则新注册用户无法交易。
参考源码
小结
UI系统是标准的Web系统,除了注册、登录外,主要交易功能均由页面JavaScript实现。UI系统本身不是交易入口,它通过转发JavaScript请求至真正的API入口。
24.9 项目总结
现在,我们已经成功地完成了一个7x24运行的证券交易系统。虽然实现了基本功能,但仍有很多可改进的地方。
网关
直接给用户暴露API和UI是不合适的,通常我们会选择一个反向代理充当网关。可以使用Spring Cloud Gateway来实现网关。Spring Cloud Gateway是基于Netty的异步服务器,允许我们编写一系列过滤器来实现黑名单、权限检查、限流等功能。
远程调用
在系统内部,我们直接通过HTTP请求实现了远程调用,因为暴露的接口较少。如果接口比较多,可以考虑使用RPC调用,例如Spring Cloud OpenFeign。Spring Cloud OpenFeign把REST请求封装为Java接口方法,实现了一种声明式的RPC调用。也可以考虑更加通用的gRPC。
系统监控
要监控系统状态、性能等实时信息,我们需要构造一个监控系统。从零开始是不现实的,选择一个通用的标准协议比使用JMX要更简单。StatsD就是目前最流行的监控方案,它的基本原理是:
┌ ─ ─ ─ ─ ─ ─ ─ ┐ ┌───────────┐│ │Application│ │ └───────────┘│ │ │ UDP││ ▼ │ ┌───────────┐ ┌───────────┐│ │ StatsD │─┼────▶│ Server │ └───────────┘ └───────────┘└ ─ ─ ─ ─ ─ ─ ─ ┘
应用程序本身负责收集监控数据,然后以UDP协议发给StatsD守护进程,StatsD进程通常和应用程序运行在同一台机器上,它非常轻量级,并且StatsD是否运行都不影响应用程序的正常运行(因为UDP协议只管发不管能不能收到)。如果StatsD进程在运行中,它就把监控数据实时发送给聚合服务器如Graphite,再以可视化的形式展示出来。
StatsD是一个解决方案,既可以自己用开源组件搭建,又可以选择第三方商业服务商,例如DataDog。应用程序自身的数据采集则需要根据使用的服务商确定。如果使用DataDog,它会提供一个dd-java-agent.jar
,在启动应用程序时,以agent的方式注入到JVM中:
$ java -javaagent:dd-java-agent.jar -jar app.jar
再通过引入DataDog提供的API:
<dependency> <groupId>com.datadoghq</groupId> <artifactId>dd-trace-api</artifactId> <version>{version}</version></dependency>
就可以实现数据采集。DataDog提供的agent除了能采集应用程序的数据,还可以直接监控JVM、Linux系统,能大大简化监控配置。
对于分布式调用,例如UI调用API,API调用Engine,还可以集成Spring Cloud Sleuth来监控链路。它通过在入口调用每次生成一个唯一ID来跟踪链路,采集数据可直接与StatsD集成。
密钥管理
对于很多涉及密钥的配置来说,如数据库密码,系统AES密码,管理员口令等,直接存放在配置文件或数据库中都是不安全的。使用专业的密钥管理软件如Vault可以更安全地管理密钥。Spring Cloud Vault就是用于从Vault读取密钥,适合对安全性要求特别高的项目。
小结
至此,我们已经从Java入门开始,学习了Java基础、JavaEE开发,重点介绍了Spring、Spring Boot和Spring Cloud,并通过一个实战项目,完成了分布式应用程序的开发。相信学到这里的你,已经成为了一个优秀的系统架构师!