This Week in Rust (TWiR)
Rust 语言周刊中文翻译计划, 第 605 期本文翻译自 Rex Wang 的博客文章 https://wangjunfei.com/2025/06/18/Rewriting-Kafka-in-Rust-Async-Insights-and-Lessons-Learned/, 英文原文版权由原作者所有, 中文翻译版权遵照 CC BY-NC-SA 协议开放. 如原作者有异议请邮箱联系.
相关术语翻译依照 Rust 语言术语中英文对照表.
囿于译者自身水平, 译文虽已力求准确, 但仍可能词不达意, 欢迎批评指正.
2025 年 6 月 29 日上午, 于广州.
Rewriting Kafka in Rust Async: Insights and Lessons Learned in Rust
使用异步 Rust 重写 Kafka: 见解与经验教训
TL; DR
-
Rewriting Kafka in Rust not only leverages Rust’s language advantages but also allows redesigning for superior performance and efficiency.
使用 Rust 重写 Kafka 不仅受益于 Rust 的语言优势, 还可以通过重新设计以获得卓越的性能和效率.
-
Design Experience: Avoid Turning Functions into async Whenever Possible
设计经验: 避免盲目使用异步方法.
-
Design Experience: Minimize the Number of Tokio Tasks
设计经验: 最大限度地减少 Tokio 任务的数量.
-
Design Experience: Judicious Use of Unsafe Code for Performance-Critical Paths
设计经验: 对性能关键型路径明智地使用 unsafe 代码.
-
Design Experience: Separating Mutable and Immutable Data to Optimize Lock Granularity
设计经验: 分离可变和不可变数据以优化锁粒度.
-
Design Experience: Separate Asynchronous and Synchronous Data Operations to Optimize Lock Usage
设计经验: 分离异步和同步数据作以优化锁的使用.
-
Design Experience: Employ Static Dispatch in Performance-Critical Paths Whenever Possible
设计经验: 尽可能在性能关键型路径中使用静态调度.
At the project’s inception, I initially considered implementing StoneMQ using C/C++. After grappling with the frustrating and persistent off-heap memory leak issues encountered in DDMQ (built on Pulsar) within the JVM environment, I realized that relying on off-heap memory for caching or buffering in the JVM is far from ideal. Having operated the Mafka cluster (based on Kafka) for nearly five years, we never faced memory-related problems such as abnormal memory growth, prolonged garbage collection times, or memory leaks. When a memory leak occurs on a single node in a cluster, memory usage escalates uncontrollably, leaving you no choice but to perform periodic restarts—a daunting and risky task when managing thousands of nodes.
项目伊始, 我起初考虑使用 C/C++ 实现 StoneMQ. 在解决了 JVM 环境中 DDMQ (基于 Pulsar 构建) 遇到的令人沮丧且持续的堆外内存泄漏问题后, 我意识到在 JVM 中依赖堆外内存进行缓存或缓冲远非理想. Mafka 集群 (基于 Kafka) 运行了近五年, 我们从未遇到过内存相关问题, 例如内存异常增长、垃圾回收时间延长或内存泄漏. 当集群中的单个节点发生内存泄漏时, 内存使用量会不受控制地升级, 让您别无选择, 只能执行定期重启, 这在管理数千个节点时是一项艰巨且冒险的任务. (译者注: 笔误? 应当为遇到过内存泄露问题.)
(译者注: Mafka 是美团基于 Kafka 开发的一个消息队列产品, 原作者任职于美团点评基础架构部门.)
Kafka and Pulsar both run as services on Linux; more precisely, they operate within the JVM and have no direct interaction with the native Linux system. Strictly speaking, they do not belong to the domain of system-level programming. So why choose C/C++ as the underlying language? This choice clearly results in reduced development efficiency and increased complexity.
Kafka 和 Pulsar 在 Linux 上都作为服务运行; 更准确地说, 它们在 JVM 中运行, 与本机 Linux 系统没有直接交互. 严格来说, 它们不属于系统级编程的领域. 那么为什么选择 C/C++ 作为底层语言呢? 这种选择显然会导致开发效率降低和复杂性增加.
My perspective is that Kafka and Pulsar, as messaging queues, have become de facto industrial standards, having matured and stabilized over nearly eight years. Could we not reimplement them in a lower-level language to reap greater benefits in performance and stability? With some enhancements, might we transform them into first-class native Linux processes, achieving a level of integration and efficiency akin to that of the Linux kernel’s own services?
我的观点是, Kafka 和 Pulsar 作为消息队列产品, 在近八年的时间里已经成熟并稳定下来, 已经成为事实上的行业标准. 我们难道不能用低级语言重新实现它们, 以便在性能和稳定性方面获得更大的进步吗? 通过一些增强, 我们是否可以将它们转换为一流的原生 Linux 进程, 实现类似于 Linux 内核自身服务的集成和效率水平?
Through extensive research, it has become evident that C/C++ is gradually becoming a sunset language, nearing its twilight. Its successor, Rust, which emerged around 2010, has spent over a decade steadily conquering the domain of systems programming. Even the notoriously exacting Linus Torvalds could no longer ignore Rust’s presence; in a recent kernel update, he resolutely overcame opposition from the C community to formally integrate Rust into the Linux kernel mainline for the first time.
通过广泛的研究, 很明显, C/C++ 正在逐渐成为一种日落语言, 接近暮年. 它的继任者 Rust 出现在 2010 年左右, 花了十多年的时间稳步征服系统编程领域. 即使是出了名的苛刻的 Linus Torvalds 也无法再忽视 Rust 的存在; 在最近的一次内核更新中, 他毅然决然地克服了 C 社区的反对, 首次正式将 Rust 集成到 Linux 内核主线中.
Google has employed Rust for years in Android’s low-level development, claiming efficiency and safety several times superior to C++. The CTO of Microsoft Azure has even recommended that new projects move away from C/C++ in favor of Rust. Amazon AWS, notably, has recruited key Rust contributors and utilized Rust to build innovative projects such as the new S3 backend storage and the renowned Firecracker microVM.
多年来, Google 一直在 Android 的低级开发中使用 Rust, 声称其效率和安全性比 C++ 高出数倍. Microsoft Azure 的 CTO 甚至建议新项目从 C/C++ 转向 Rust. 值得注意的是, Amazon AWS 已经招募了关键的 Rust 贡献者, 并利用 Rust 构建了创新项目, 例如新的 S3 后端存储和著名的 Firecracker microVM.
Against this backdrop, I decisively chose Rust as the development language for StoneMQ. Admittedly, Rust has a steep learning curve, a common critique among developers. However, while challenging, it is by no means insurmountable. Moreover, system developers constitute a relatively small niche within the broader developer community, so widespread adoption is not imperative.
在此背景下, 我果断选择 Rust 作为 StoneMQ 的开发语言. 诚然, Rust的学习曲线陡峭, 这是开发者群体中常见的批评. 然而, 尽管具有挑战性, 但绝非不可克服. 此外, 系统开发者在更广泛的开发者社区中属于相对小众的领域, 因此并不需要大规模普及.
After a year of iterative development, learning Rust while coding, StoneMQ has encountered and anticipated numerous pitfalls. Here, I summarize some of these lessons, with plans to elaborate further in the future, hoping to aid others in avoiding similar missteps. (Repository)
经过一年的迭代开发, 边码代码边学习 Rust, StoneMQ 预料之中地遇到了许多陷阱. 在这里, 我总结了其中的一些教训, 并计划在未来进一步阐述, 希望能帮助其他人避免类似的失误. (存储库)
Though StoneMQ was developed concurrently with my Rust learning journey, it was built to exacting standards: first, to outperform Kafka in terms of performance; second, to maintain clear and readable code. Achieving these goals demanded meticulous attention to design and optimization throughout development. StoneMQ underwent two major restructurings and numerous minor refactorings—details of which can be explored through its extensive git commit history.
尽管 StoneMQ 与我的 Rust 学习之旅同时起步, 但它按照严格的标准构建: 首先, 在性能方面优于 Kafka; 第二, 保持代码清晰可读. 要实现这些目标, 需要在整个开发过程中对设计和优化一丝不苟. StoneMQ 经历了两次重大重组和许多次小重构, 具体细节可以通过其丰富的的 git 提交历史来探索.
Avoid Turning Functions into async Whenever Possible
避免盲目使用异步方法
In Rust, declaring a function as async fn compiles it into a state machine, and its invocation yields a Future. While asynchronous functions enable more concise code, excessive use of async—especially when applied to logic that could be executed synchronously—results in:
在 Rust 中, 将函数声明为 async fn
会将其编译成状态机, 并且它的调用会产生一个 Future
(译者注: async fn
是个语法糖, 返回一个匿名的状态机结构体, 该结构体实现了 Future
, 可被异步调度执行). 虽然完全使用异步函数能带来更简洁的代码, 但过度使用 async
时 (尤其是当应用于那些本就只能同步执行的逻辑时) 会导致:
-
Additional overhead from the Future state machine;
Future 状态机的额外开销;
-
Proliferation of tasks (e.g., Tokio tasks);
任务激增 (例如 Tokio 任务) ;
-
Increased load on the backend scheduler.
增加了后端调度程序的负载.
Therefore, the best practice when designing asynchronous interfaces is to prefer synchronous functions for logic that can run synchronously, reserving async functions solely for genuinely asynchronous operations.
因此, 在设计异步接口时, 最佳实践是首选同步方法用于可以同步运行的逻辑, 只在真正需要的时候使用异步方法.
If only a portion of a function requires asynchronous handling, separate the synchronous logic from the asynchronous part. This approach allows finer control over task granularity and scheduling overhead, ultimately enhancing overall efficiency and responsiveness.
如果函数只有一部分需要异步处理, 请将同步逻辑与异步逻辑部分分开. 这种方法可以更精细地控制任务粒度和计划开销, 最终提高整体效率和响应能力.
Consider the following example:
请考虑以下示例:
#![allow(unused)] fn main() { // INEFFICIENT: Entire function is async, though only the IO operations require async fn fetch_and_parse(url: &str) -> anyhow::Result<Data> { let body = reqwest::get(url).await?.text().await?; let parsed = serde_json::from_str::<Data>(&body)?; Ok(parsed) } // EFFICIENT: Separate synchronous parsing from asynchronous downloading async fn fetch_and_parse(url: &str) -> anyhow::Result<Data> { let body = reqwest::get(url).await?.text().await?; Ok(parse_json(&body)?) // parse_json is sync } fn parse_json(raw: &str) -> anyhow::Result<Data> { Ok(serde_json::from_str(raw)?) } }
Minimize the Number of Tokio Tasks
尽量减少 Tokio 任务的数量
The Tokio runtime manages asynchronous operations by scheduling tasks across CPU cores. Excessive task counts can cause several issues:
Tokio 通过跨 CPU 内核调度任务来管理异步操作. 任务数过多可能会导致以下几个问题:
-
Frequent task switching leads to CPU cache thrashing, degrading pipeline efficiency;
频繁的任务切换会导致 CPU 缓存抖动, 从而降低管道效率;
-
The scheduler may engage in “busy waiting” among numerous idle tasks, wasting CPU cycles;
调度器可能会在众多空闲任务中进行 “忙等待”, 浪费 CPU 周期;
-
Excessive task preemption slows response times and increases latency.
过多的任务抢占会减慢响应时间并增加延迟.
Therefore, judiciously consolidating tasks and avoiding the creation of numerous tiny asynchronous tasks is crucial for optimizing Tokio’s scheduling efficiency. By controlling the number of tasks, the scheduler’s overhead is reduced, allowing critical tasks to receive more CPU time slices, thereby improving system throughput and latency.
因此, 明智地整合任务并避免创建大量微小的异步任务对于优化 Tokio 的调度效率至关重要. 通过控制任务数量, 可以减少调度程序的开销, 使关键任务能够接收更多的 CPU 时间片, 从而提高系统吞吐量和延迟.
Example code:
示例代码:
#![allow(unused)] fn main() { // Excessive small tasks: spawning a task per record // 过多的小任务: 为每条 record 生成一个任务 async fn process_many(records: Vec<String>) { for r in records { tokio::spawn(async move { process_record(r).await; }); } } }
Reduce task count by batching processing:
通过批处理减少任务计数:
#![allow(unused)] fn main() { async fn process_many(records: Vec<String>) { // Spawn a single task to process multiple records in batch tokio::spawn(async move { for r in records { process_record(r).await; } }); } }
(译者注: 这种写法实际上顺序地执行 process_record
.)
Alternatively, design a pipeline using asynchronous channels to further reduce the number of tasks.
或者, 使用异步 Channel 设计一个处理管线来进一步减少任务数量.
Design Experience: Efficient Task Management and Request Handling in StoneMQ
设计经验: StoneMQ 高效任务管理和请求处理
When designing the server architecture for StoneMQ, I paid special attention to how asynchronous tasks (tokio::task
) are managed, aiming to maximize resource efficiency and system stability.
在为 StoneMQ 设计服务架构时, 我特别关注异步任务 (tokio::task
) 的管理方式, 旨在最大限度地提高资源效率和系统稳定性.
The Problem with Excessive Task Spawning
生成 (spawn) 过多任务的问题
A common but problematic pattern in async server design is to spawn a new task for every incoming request. While this approach is simple, it can quickly lead to resource exhaustion under high load, as each request consumes memory and scheduling overhead. This can degrade performance and even cause the server to become unresponsive.
异步服务器设计中一个常见但有问题的模式是为每个传入请求生成一个新任务. 虽然这种方法很简单, 但它会在高负载下迅速导致资源耗尽, 因为每个请求都会消耗内存和调度开销. 这可能会降低性能, 甚至导致服务器无响应.
My Approach: Connection-per-Task, Centralized Request Processing
我的方法: 连接-任务绑定, 集中处理请求
Instead of spawning a task for every request, I designed the server so that:
我没有为每个请求生成一个任务, 而是将服务器设计为:
-
Each client connection gets a single dedicated task:
每个客户端连接都有一个专用任务:
In the code, every accepted TCP connection is handled by a
ConnectionHandler
, which is run in its own Tokio task:在代码中, 每个接受的 TCP 连接都由一个
ConnectionHandler
处理, 该连接在其自己的 Tokio 任务中运行:#![allow(unused)] fn main() { tokio::spawn(async move { if let Err(err) = handler.handle_connection().await { error!("Connection error: {:?}", err); } drop(permit); }); }
-
All requests from all connections are funneled into a shared channel:
来自所有连接的所有请求都汇集到一个共享通道中:
Each request, regardless of which connection it comes from, is sent into a central channel (
async_channel::Sender<RequestTask>
).每个请求, 无论它来自哪个连接, 都被发送到一个集中式的 Channel (
async_channel::Sender<RequestTask>
) . -
A fixed-size pool of worker tasks processes all requests:
固定大小的 worker 任务池处理所有请求:
Instead of spawning a new task per request, a configurable number of worker tasks are started at server boot. These workers continuously pull requests from the channel and process them:
不是为每个请求生成一个新任务, 而是在服务器启动时启动可配置数量的 worker 任务. 这些 worker 不断从通道拉取请求并处理它们:
#![allow(unused)] fn main() { for i in 0..num_channels { let rx: async_channel::Receiver<RequestTask> = request_rx.clone(); let replica_manager = replica_manager.clone(); let group_coordinator = group_coordinator.clone(); let handle = tokio::spawn(async move { debug!("request handler worker {} started", i); while let Ok(request) = rx.recv().await { process_request(request, replica_manager.clone(), group_coordinator.clone()).await; } debug!("request handler worker {} exited", i); }); workers.insert(i, handle); } }
This thread-pool-like model ensures that system resources are used efficiently, and the number of concurrent tasks is controlled.
这种类似线程池的模型可确保系统资源得到有效利用, 并发任务的数量得到控制.
(译者疑问: 引入管道的开销? 可以在 accept loop 里面添加 semaphore 限制并发量?)
Automatic Worker Recovery
自动工作线程恢复
To further enhance robustness, I implemented a monitoring mechanism:
为了进一步增强健壮性, 我实现了一个监控机制:
A dedicated monitor task periodically checks the health of each worker. If a worker panics or exits unexpectedly, the monitor automatically spawns a replacement to maintain the desired pool size:
专用监控任务会定期检查每个工作线程的运行状况. 如果 worker 出现 panic 或意外退出, 监控任务会自动生成新的 worker 以保持所需的池大小:
#![allow(unused)] fn main() { if join_error.is_panic() { // ... log error ... // re-generate a new task let rx = request_rx.clone(); let replica_manager = replica_manager.clone(); let group_coordinator = group_coordinator.clone(); let new_worker = tokio::spawn(async move { while let Ok(request) = rx.recv().await { process_request(request, replica_manager.clone(), group_coordinator.clone()).await; } }); workers.insert(id, new_worker); } }
Benefits
好处
-
Resource Efficiency: By limiting the number of worker tasks, the server avoids the overhead of thousands of concurrent tasks under heavy load.
资源效率: 通过限制 worker 任务的数量, 服务器避免了在重负载下数千个并发任务的开销.
-
Predictable Performance: The thread-pool model provides more consistent latency and throughput.
可预测的性能: 线程池模型提供更一致的延迟和吞吐量.
-
Fault Tolerance: The monitor ensures that if a worker fails, it is quickly replaced, maintaining system stability.
容错: 监视器确保在 worker 发生故障时, 可以快速更换它, 从而保持系统稳定性.
-
Scalability: The number of worker tasks can be tuned according to system capabilities and expected load.
可扩展性: 可以根据系统功能和预期负载调整 worker 任务的数量.
Conclusion
结论
This design pattern—dedicating a task per connection, centralizing request processing, and using a fixed-size worker pool—strikes a balance between concurrency and resource control. It’s a practical approach for building scalable, robust, and efficient async servers in Rust with Tokio.
这种设计模式 (每个连接专用一个任务、集中请求处理和使用固定大小的工作线程池) 在并发和资源控制之间取得了平衡. 这是使用 Tokio 在 Rust 中构建可扩展、健壮且高效的异步服务器的实用方法.
Such requirements frequently arise across various scenarios. To address this, I developed a utility class that facilitates implementing such designs with ease. For detailed reference, please consult the multiple_channel_worker_pool
and single_channel_worker_pool
modules within the Utils package of StoneMQ.
此类要求经常出现在各种场景中. 为了解决这个问题, 我开发了一个实用程序类, 以便于轻松实现此类设计. 详细参考请参考 StoneMQ 的 utils 包中的 multiple_channel_worker_pool
和 single_channel_worker_pool
模块.
Design Experience: Favoring Lock-Free Architectures and Minimizing Use of Tokio Async Locks
设计经验: 支持无锁架构并最大限度地减少 Tokio 异步锁的使用
When architecting high-performance async systems in Rust, I strongly prefer designs that avoid locks—especially asynchronous locks—whenever possible. This principle is rooted in both performance considerations and code safety.
在 Rust 中构建高性能异步系统时, 我非常喜欢尽可能避免锁 (尤其是异步锁) 的设计. 此原则植根于性能考虑和代码安全.
Why Avoid Locks, Especially Async Locks?
为什么要避免使用锁, 尤其是异步锁?
-
Contention and Complexity: Traditional locks (Mutex, RwLock) can become contention points and introduce complexity, especially in concurrent environments.
争用和复杂性: 传统锁 (Mutex、RwLock) 可能会成为争用点并引入复杂性, 尤其是在并发环境中.
-
Async Lock Performance: Tokio’s asynchronous locks (
tokio::sync::Mutex
,tokio::sync::RwLock
) are significantly slower than their synchronous counterparts. They are designed for correctness in async contexts, but their performance overhead can be substantial, especially under high contention or frequent lock/unlock cycles.异步锁性能: Tokio 的异步锁 (
tokio::sync::Mutex
、tokio::sync::RwLock
) 明显慢于同步锁. 它们旨在实现异步上下文中的正确性, 但它们的性能开销可能很大, 尤其是在高争用或频繁的锁定/解锁周期下. -
Deadlock Risk: Holding any lock across
.await
points is risky and can easily lead to deadlocks or subtle bugs.死锁风险: 跨
.await
持有任何锁都是有风险的, 很容易导致死锁或细微的错误.
My Approach: Channels, Message Passing, and Task Ownership
我的方法: 通道、消息传递和任务所有权
-
Single-threaded Task Ownership: Each async task owns its state, eliminating the need for locks in most cases.
单线程任务所有权: 每个异步任务都拥有其状态, 在大多数情况下无需锁定.
-
Channel-based Communication: Tasks interact via channels, passing messages instead of sharing mutable state.
基于 Channel 的通信: 任务通过 Channel 交互, 传递消息而不是共享可变状态.
-
Minimal, Synchronous Locking: When shared mutable state is unavoidable, I use fine-grained, synchronous locks (
std::sync::Mutex
/RwLock
), and always ensure locks are held for the shortest possible time and never across.await
.最小同步锁定: 当共享可变状态不可避免时, 我会使用细粒度的同步锁 (
std::sync::Mutex
/RwLock
) , 并始终确保锁的持有时间尽可能短, 并且永远不会跨越.await
. -
Rare Use of Tokio Async Locks: I avoid Tokio’s async locks unless absolutely necessary. In almost all cases, the architecture is designed so that async locks are not required.
避免使用 Tokio 异步锁: 除非绝对必要, 否则我会避免使用 Tokio 的异步锁. 在几乎所有情况下, 体系结构的设计都不需要异步锁.
Example: Journal Log Write Path
示例: 日志写入
In the journal log write implementation, all critical sections that require locking are handled synchronously and released before any async operation:
在日志写入实现中, 所有需要锁定的关键部分都被同步处理, 并在任何异步作之前释放:
#![allow(unused)] fn main() { { // Synchronously update active segment index let mut active_seg_index = self.active_segment_index.write(); let old_segment_index = std::mem::replace(&mut *active_seg_index, new_seg); self.active_segment_base_offset .store(new_base_offset, Ordering::Release); // ... update other state ... } // lock released here // Async operations follow, with no lock held get_journal_segment_writer() .append_journal(journal_log_write_op) .await?; }
Benefits
好处
-
Performance: Avoiding Tokio’s async locks removes a major source of latency and contention in async Rust applications.
性能: 避免使用 Tokio 的异步锁, 消除异步 Rust 应用程序中延迟和争用的主要来源.
-
Safety: By never holding locks across .await, I eliminate a whole class of async deadlocks.
安全: 通过从不跨
.await
持有锁, 我消除了一整类异步死锁. -
Simplicity: The code is easier to reason about, as ownership and mutability are clear and explicit.
简单: 代码更容易推理, 因为所有权和可变性清晰明了.
Conclusion
结论
By leveraging message passing, task-local state, and only minimal, synchronous locking, I achieve both high efficiency and safety in async Rust systems. Avoiding Tokio’s async locks is a deliberate choice, as their performance overhead is rarely justified except in very specific scenarios. This approach leads to scalable, maintainable, and robust code.
通过利用消息传递、任务本地状态和最低限度的同步锁定, 我在异步 Rust 系统中实现了高效率和安全性. 避免使用 Tokio 的异步锁是一个深思熟虑的选择, 因为除非在非常特殊的情况下, 否则它们的性能开销很少是合理的. 这种方法可以产生可扩展、可维护且健壮的代码.
Design Experience: Judicious Use of Unsafe Code for Performance-Critical Paths
设计经验: 对性能关键路径明智地使用 unsafe 代码
In Rust, safety is a core language feature, but there are scenarios—especially in performance-critical systems programming—where the cost of absolute safety can be significant. In such cases, I carefully consider the use of unsafe code to achieve the necessary performance, while still maintaining overall correctness and encapsulation.
在 Rust 中, 安全性是核心语言功能, 但在某些情况下 (尤其是在性能关键型系统编程中) , 绝对安全性的成本可能很高. 在这种情况下, 我会仔细考虑使用 unsafe 代码来实现必要的性能, 同时仍然保持整体正确性和(安全 API 的)封装.
Why Use Unsafe Code?
为什么要使用 unsafe 代码?
-
Performance: Some operations, such as memory-mapped file access or direct byte manipulation, can be much faster when performed without the overhead of Rust’s safety checks.
性能: 某些操作 (例如内存映射文件访问或直接字节操作) 在没有 Rust 安全检查开销的情况下执行时可能会快得多.
-
System-level Access: Certain low-level APIs (e.g., memory mapping, FFI, or direct buffer manipulation) require unsafe blocks to interact with OS or hardware resources.
系统级访问: Rust 无法保证低级 API (例如, 内存映射、FFI 或直接缓冲区操作) 的安全性, 需要 unsafe 块才能与操作系统或硬件资源交互.
My Approach: Isolate and Audit Unsafe Code
我的方法: 隔离和审计不安全代码
-
Encapsulation: Unsafe code is always encapsulated within well-tested, minimal, and clearly documented functions or modules.
API 封装: 不安全代码始终封装在经过充分测试、最小且记录清晰的函数或模块中.
-
Justification: Unsafe is only used when profiling or design analysis shows that safe alternatives are a bottleneck.
明晰理由: 仅当分析或设计分析表明安全替代方案是瓶颈时, 才使用 unsafe 代码.
-
Testing: I ensure that all unsafe code is covered by thorough tests and code reviews.
充分测试: 我确保所有不安全的代码都经过彻底的测试和代码审查.
Example: Memory-Mapped Index Files
示例: 内存映射索引文件
In the index_file.rs
module, I use the memmap2
crate to memory-map index files for fast, zero-copy access. Creating a memory map inherently requires unsafe code, as shown here:
在 index_file.rs
模块中, 我使用 memmap2
crate 对索引文件进行内存映射, 以实现快速的零拷贝访问. 创建内存映射本身需要 unsafe 代码, 如下所示:
#![allow(unused)] fn main() { let mmap = unsafe { MmapOptions::new().map(&file)? }; }
This allows the index file to be accessed as a byte slice, enabling efficient binary search and direct manipulation without extra copying or allocation. Similarly, for writable index files:
这允许将索引文件作为字节切片进行访问, 从而实现高效的二进制搜索和直接作, 而无需额外的复制或分配. 同样, 对于可写索引文件:
#![allow(unused)] fn main() { let mmap = unsafe { MmapOptions::new().map_mut(&file)? }; }
By isolating the unsafe block to just the memory mapping operation, the rest of the code can remain safe and idiomatic Rust.
通过将 unsafe 块隔离为内存映射操作中, 代码的其余部分可以保持安全和惯用的 Rust.
Benefits
好处
-
Maximum Performance: Direct memory access and zero-copy operations are critical for high-throughput log/index management.
最高性能: 直接内存访问和零拷贝作对于高吞吐量日志/索引管理至关重要.
-
Controlled Risk: By limiting the scope of unsafe, I minimize the risk of undefined behavior.
受控风险: 通过限制 unsafe 的范围, 我将未定义行为的风险降至最低.
-
Maintainability: Encapsulated unsafe code is easier to audit and reason about.
可维护性: 封装的不安全代码更容易审计和推理.
Conclusion
结论
While Rust’s safety guarantees are invaluable, there are times when performance requirements justify the careful use of unsafe code. By isolating and rigorously testing these sections, I can achieve both the speed and reliability needed for system-level components like log and index management.
虽然 Rust 的安全保证非常宝贵, 但有时性能要求证明谨慎使用 unsafe 代码是合理的. 通过隔离和严格测试这些部分, 我可以实现系统级组件 (如日志和索引管理) 所需的速度和可靠性.
Design Experience: Separating Mutable and Immutable Data to Optimize Lock Granularity
设计经验: 分离可变和不可变数据以优化锁粒度
A key architectural principle I follow in high-performance systems is to separate mutable and immutable data, and to use the finest possible lock granularity. This approach minimizes contention, improves concurrency, and makes the system easier to reason about.
我在高性能系统中遵循的一个关键架构原则是分离可变和不可变数据, 并使用尽可能精细的锁粒度. 这种方法最大限度地减少了争用, 提高了并发性, 并使系统更易于推理.
Why Separate Mutable and Immutable Data?
为什么要将可变数据和不可变数据分开?
-
Reduced Contention: By isolating mutable state, only the truly changing parts of the system require synchronization, while immutable data can be freely shared and accessed without locks.
减少争用: 通过隔离可变状态, 只有系统中真正变化的部分需要同步, 而不可变数据可以自由共享和访问, 而无需锁定.
-
Optimized Locking: Fine-grained locks (e.g., per-segment or per-structure) allow multiple operations to proceed in parallel, rather than serializing all access through a single global lock.
优化锁定: 细粒度锁 (例如, 每个段或每个结构) 允许多个操作并行进行, 而不是通过单个全局锁序列化所有访问.
-
Clearer Ownership: The distinction between mutable and immutable data clarifies which parts of the code can safely share data and which require careful coordination.
更清晰的所有权: 可变数据和不可变数据之间的区别阐明了代码的哪些部分可以安全地共享数据, 哪些部分需要仔细协调.
My Approach: Example from Journal Log Management
我的方法: 日志管理示例
In the journal log implementation, I apply this principle by:
在日志实现中, 我通过以下方式应用此原则:
-
Using separate structures for mutable and immutable data:
使用不同的数据结构包裹可变和不可变数据:
-
The
active_segment_index
(the currently writable segment) is protected by aRwLock
, allowing multiple readers or a single writer.active_segment_index
(当前可写段) 受RwLock
保护, 允许多个读取器或单个写入器. -
The set of segment base offsets (segments_order) is also protected by a RwLock, but is only modified when segments are added or removed.
段基址偏移集合 (segments_order) 也受
RwLock
保护, 但仅在添加或删除段时进行修改. -
All read-only segments are stored as
Arc<ReadOnlySegmentIndex>
in a concurrentDashMap
, allowing lock-free concurrent reads.所有只读段都以
Arc<ReadOnlySegmentIndex>
的形式存储在并发DashMap
中, 允许无锁并发读取.
#![allow(unused)] fn main() { #[derive(Debug)] pub struct JournalLog { // Ordered set of segment base offsets (non-active segments) segments_order: RwLock<BTreeSet<i64>>, // Map of base offsets to read-only segments segment_index: DashMap<i64, Arc<ReadOnlySegmentIndex>>, // Currently active segment active_segment_index: RwLock<ActiveSegmentIndex>, // ... other fields ... } }
-
-
Read-only data is stored in
ReadOnlySegmentIndex
:只读数据存储在
ReadOnlySegmentIndex
中:Once a segment is no longer active, it is converted to a read-only structure and stored in an
Arc
, making it safe to share across threads without any locking.一旦一个段不再处于活动状态, 它就会被转换为只读结构并存储在
Arc
中, 从而可以安全地跨线程共享, 而无需任何锁定. -
Lock scope is minimized:
锁定范围最小化:
For example, when rolling a segment, the lock on
active_segment_index
is only held for the duration of the swap, and not across any async or I/O operations.例如, 在滚动段时, 对
active_segment_index
的锁定仅在交换期间保持, 而不在任何异步或 I/O 操作中保持.
Benefits
好处
-
High Concurrency: Multiple threads can read from different segments or query the segment index concurrently, without blocking each other.
高并发: 多个线程可以从不同的 Segment 读取数据或并发查询 Segment 索引, 而不会相互阻塞.
-
Minimal Locking Overhead: Only the small, mutable parts of the system are protected by locks, and those locks are held for the shortest possible time.
最小锁定开销: 只有系统的小的可变部分受锁保护, 并且这些锁的持有时间尽可能短.
-
Scalability: The system can efficiently handle many concurrent operations, such as reads, writes, and segment management.
可扩展性: 该系统可以有效地处理许多并发操作, 例如读取、写入和分段管理.
Conclusion
结论
By separating mutable and immutable data, and applying the smallest possible lock granularity, I achieve both high performance and maintainability. This pattern is especially effective in log-structured storage and similar systems, where most data is append-only or read-mostly, and only a small portion is actively mutated.
通过分离可变和不可变数据, 并应用尽可能小的锁粒度, 我实现了高性能和可维护性. 这种模式在日志结构存储和类似系统中特别有效, 其中大多数数据是仅追加或读取为主, 只有一小部分是主动更改的.
Design Experience: Separate Asynchronous and Synchronous Data Operations to Optimize Lock Usage
设计经验: 分离异步和同步数据操作以优化锁使用
The locking requirements of asynchronous and synchronous code differ fundamentally: holding a synchronous lock within asynchronous code can cause the entire Future dependency chain to block, negating the benefits of the asynchronous model. Conversely, asynchronous locks incur higher overhead and employ more complex mechanisms.
异步和同步代码的锁定要求从根本上不同: 在异步代码中持有同步锁会导致整个 Future 依赖链阻塞, 从而抵消异步模型的好处. 相反, 异步锁会产生更高的开销并采用更复杂的机制.
The best practice is to segregate the data involved in asynchronous operations from that used in synchronous operations, ensuring that locks protecting synchronous data do not span asynchronous contexts. This allows synchronous portions to safely utilize efficient synchronous locks (such as std::sync::Mutex
), while the asynchronous parts avoid blocking or contention.
最佳做法是将异步操作中涉及的数据与同步操作中使用的数据分开, 确保保护同步数据的锁不会跨越异步上下文. 这允许同步部分安全地使用高效的同步锁 (例如 std::sync::Mutex
) , 而异步部分避免阻塞或争用.
By isolating data domains and restructuring access patterns accordingly, you can significantly reduce lock contention and blocking, thereby enhancing asynchronous execution efficiency.
通过隔离数据域并相应地重构访问模式, 您可以显著减少锁争用和阻塞, 从而提高异步执行效率.
Example: StoneMQ Journal Log Write Path
示例: StoneMQ 日志写入
In my project, the journal log write implementation demonstrates this pattern clearly. Here’s a simplified version:
在我的项目中, 日志写入的实现清楚地演示了这种模式. 这是一个简化版本:
#![allow(unused)] fn main() { // Synchronously update the active segment index { let mut active_seg_index = self.active_segment_index.write(); let old_segment_index = std::mem::replace(&mut *active_seg_index, new_seg); // ... update other state ... } // lock released here // Now perform async operations, with no lock held get_journal_segment_writer() .append_journal(journal_log_write_op) .await?; }
Notice how the lock is acquired, the necessary mutation is performed, and then the lock is released before any async operation is awaited.
请注意如何获取锁, 如何执行必要的更改, 然后在等待任何异步操作之前释放锁.
Example: Group Coordinator
示例: 组协调器
In the group coordinator logic, the same principle is applied. Before calling any async function, the lock is explicitly dropped:
在组协调器的逻辑中, 应用了相同的原则. 在调用任何异步函数之前, 显式删除锁:
#![allow(unused)] fn main() { ```rust,no_run drop(locked_group); self.maybe_prepare_rebalance(group).await; }
This ensures that no lock is ever held across an .await
, preventing deadlocks and maximizing concurrency.
这可确保在 .await
之间不会持有任何锁, 从而防止死锁并最大化并发性.
Benefits
好处
-
Performance: Synchronous locks are much faster than async locks, and can be used safely when not crossing async boundaries.
性能: 同步锁比异步锁快得多, 并且在不跨越异步边界时可以安全地使用.
-
Safety: Avoids deadlocks and subtle bugs that can arise from holding locks across
.await
.安全性: 避免了因在
.await
中持有锁而可能出现的死锁和细微的 bug. -
Clarity: The code is easier to reason about, as the boundaries between sync and async operations are explicit.
清晰性: 代码更容易推理, 因为 sync 和 async 操作之间的界限是明确的.
Conclusion
结论
By separating async and sync data, and ensuring that locks are only used for sync data and never held across .await
, you can write high-performance, safe, and maintainable async Rust code. This pattern is a cornerstone of robust async system design, and has paid off time and again in my own projects.
通过分离异步数据和同步数据, 并确保锁仅用于同步数据, 并且永远不会跨 .await
持有, 您可以编写高性能、安全且可维护的异步 Rust 代码. 此模式是健壮的异步系统设计的基石, 并且在我自己的项目中一次又一次地得到了回报.
Design Experience: Employ Static Dispatch in Performance-Critical Paths Whenever Possible
设计经验: 尽可能在性能关键型路径中使用静态调度
Design Experience: Using Enums for Protocol Type Polymorphism in Performance-Critical Code
设计经验: 在性能关键型代码中使用枚举实现协议类型多态性
When building the protocol layer for StoneMQ, I faced a key design decision: how to represent and handle the various data types that appear in protocol messages. The two main options were using Rust’s trait objects for dynamic polymorphism, or using an enum to statically represent all possible protocol types.
在为 StoneMQ 构建协议层时, 我面临着一个关键的设计决策: 如何表示和处理协议消息中出现的各种数据类型. 两个主要选项是使用 Rust 的 trait 对象进行动态多态性, 或者使用枚举静态表示所有可能的协议类型.
Why Not Trait Objects?
为什么不使用 trait 对象?
Trait objects (dyn Trait
) are a common way to achieve polymorphism in Rust. They allow different types to be handled through a common interface, with the actual method implementations determined at runtime. This approach is flexible and can make code easier to extend. However, it comes with some downsides:
trait 对象 (dyn Trait
) 是在 Rust 中实现多态性的常用方法. 它们允许通过一个公共接口处理不同类型的方法, 实际的方法实现在运行时确定. 此方法非常灵活, 可以使代码更易于扩展. 但是, 它也有一些缺点:
-
Dynamic dispatch: Every method call on a trait object involves a runtime lookup, which adds overhead.
动态分派: 对 trait 对象的每个方法调用都涉及运行时查找, 这会增加开销.
-
Heap allocation: Trait objects often require heap allocation, especially when used in collections.
堆分配: trait 对象通常需要堆分配, 尤其是在集合中使用时.
-
Performance impact: In a performance-critical layer like a network protocol, even small inefficiencies can add up.
性能影响: 在网络协议等性能关键层中, 即使是很小的低效率也会累积起来.
The Enum Approach
使用枚举
To avoid these costs, I chose to use an enum—ProtocolType
—to represent all protocol data types. Each variant of the enum corresponds to a specific protocol type, such as I8
, I16
, PString
, Array
, etc. Here’s a simplified excerpt from the actual code:
为了避免这些开销, 我选择使用枚举 ProtocolType
来表示所有协议数据类型. 枚举的每个变体对应于特定的协议类型, 例如 I8
, I16
, PString
, Array
等. 以下是实际代码的简化摘录:
#![allow(unused)] fn main() { pub enum ProtocolType { Bool(Bool), I8(I8), I16(I16), I32(I32), U32(U32), I64(I64), PString(PString), NPString(NPString), PBytes(PBytes), NPBytes(NPBytes), PVarInt(PVarInt), PVarLong(PVarLong), Array(ArrayType), Records(MemoryRecords), Schema(Arc<Schema>), ValueSet(ValueSet), } }
This design allows me to match on the enum and handle each type explicitly, with all dispatch resolved at compile time. For example, when calculating the wire format size of a protocol value, I can use a match
statement:
这种设计允许我在枚举上进行匹配并显式处理每个类型, 并在编译时解析所有分支. 例如, 在计算协议值的线路格式大小时, 我可以使用 match
语句:
#![allow(unused)] fn main() { pub fn size(&self) -> i32 { match self { ProtocolType::Bool(bool) => bool.wire_format_size() as i32, ProtocolType::I8(i8) => i8.wire_format_size() as i32, // ... other types ... ProtocolType::Array(array) => array.size() as i32, // ... ProtocolType::Schema(_) => { panic!("Unexpected calculation of schema size"); } ProtocolType::ValueSet(valueset) => valueset.size() as i32, } } }
Benefits in Practice
实践中的好处
-
Performance: All type dispatch is handled at compile time, with no runtime overhead from dynamic dispatch.
性能: 所有类型分支都在编译时处理, 不会产生运行时开销.
-
Type Safety: The compiler ensures that all protocol types are handled, reducing the risk of runtime errors.
类型安全: 编译器确保处理所有协议类型, 从而降低运行时错误的风险.
-
Extensibility: Adding a new protocol type is as simple as adding a new enum variant and updating the relevant match arms.
可扩展性: 添加新的协议类型很简单: 添加新的枚举变体并更新相关的
match
分支.
Example: Array Construction
示例: 数组构造
I also used macros and generic functions to make it easy to construct arrays of protocol types:
我还使用了宏和泛型函数来轻松构造协议类型的数组:
#![allow(unused)] fn main() { macro_rules! array_of { ($func_name:ident, $type:ty) => { pub fn $func_name(value: $type) -> ProtocolType { ProtocolType::Array(ArrayType { can_be_empty: false, p_type: Arc::new(value.into()), values: None, }) } }; } // Usage: array_of!(array_of_i32_type, i32); array_of!(array_of_string_type, String); }
Conclusion
结论
By using an enum to represent protocol types, I achieved efficient, type-safe, and maintainable code for the protocol layer. This approach is especially valuable in systems where performance is critical and the set of types is known and relatively stable.
通过使用枚举来表示协议类型, 我为协议层实现了高效、类型安全且可维护的代码. 这种方法在性能至关重要且类型集已知且相对稳定的系统中特别有价值.
Summary
总结
Achieving high-performance asynchronous Rust projects transcends mere usage of the async/await syntax; it fundamentally relies on a deep understanding of the underlying task scheduling, lock optimization, and architecture design principles. From eschewing gratuitous async functions to judiciously managing lock granularity; from minimizing the number of spawned tasks to adopting a message-passing architecture; from cautious use of unsafe code to segregating asynchronous and synchronous data; and finally, to maximizing the use of static dispatch—each practice serves to bolster both the efficiency and robustness of your application.
实现高性能的异步 Rust 项目不能局限于单纯使用 async/await 语法; 它从根本上依赖于对底层任务调度、锁优化和架构设计原则的深刻理解. 从避免无端的异步函数到明智地管理锁粒度; 从最大限度地减少生成任务的数量到采用消息传递架构; 从谨慎使用不安全代码到分离异步和同步数据; 最后, 最大限度地利用静态调度: 每种做法都有助于提高应用程序的效率和健壮性.
I hope this summary offers valuable insights and assistance to those engaged in Rust asynchronous development. Contributions and discussions are warmly welcomed, as we collectively strive to elevate the practical maturity of the Rust async ecosystem!
我希望这个总结能为那些从事 Rust 异步开发的人提供有价值的见解和帮助. 热烈欢迎贡献和讨论, 我们的共同努力将提升 Rust 异步生态系统的实际成熟度!