基于Rust构建APISIX插件实现对存量Ruby系统的身份认证与流量治理


项目的技术债务积累,往往始于一个看似无害的认证模块。我们面对的正是这样一个典型场景:一个承载着核心业务的Ruby on Rails monolith应用,其用户认证系统基于一套内部自定义的、有状态的Token机制。随着业务流量的激增,这个认证模块成为了整个系统的性能瓶 ઉ颈。每一次API请求都需要与后端的Session存储进行交互验证,高并发下数据库连接被迅速耗尽,请求延迟居高不下。新的前端项目采用React与MobX技术栈,期望获得流畅的单页应用体验,但被这个陈旧的认证机制严重拖累。

摆在我们面前的问题很明确:如何在不重写核心Ruby应用的前提下,解决认证性能瓶颈,并为现代前端提供一个无状态、标准化的认证方案(如JWT)。

架构决策的十字路口

在技术选型会议上,我们探讨了两种主流方案。

方案A:构建专用的认证微服务

这是最直接的思路。将认证逻辑从Ruby monolith中剥离出来,用Go或Rust这类高性能语言重写,并部署为一个独立的微服务。

  • 优势:

    1. 权责清晰,新服务可以独立迭代和扩缩容。
    2. 技术栈可以彻底现代化,摆脱Ruby的性能限制。
    3. 可以为全公司的所有业务提供统一的认证能力。
  • 劣势:

    1. 开发成本高昂: 重写认证逻辑,意味着要处理所有历史包袱,包括多种密码加密方式、用户权限模型等,工作量巨大。
    2. 数据同步复杂: 新服务需要访问用户数据。这意味着要进行数据库拆分或引入数据同步机制,无论是双写、CDC还是定时同步,都会引入新的复杂性和潜在的数据不一致风险。
    3. 运维负担加重: 引入新的服务、新的数据库、新的部署流程,对运维团队提出了更高的要求。在真实项目中,任何新组件的引入都必须评估其带来的长期维护成本。

方案B:在API网关层进行逻辑前置

这个方案的核心思想是利用API网关的可扩展性,将认证逻辑前置处理。客户端与网关交互,网关负责与后端的旧认证系统通信,并将旧的、有状态的Token转换为新的、无状态的JWT,再将请求代理到上游服务。

  • 优势:

    1. 对后端应用透明: Ruby monolith无需任何代码改动,继续维持其现有的认证方式。
    2. 性能提升显著: 高频的Token验证操作被卸载到高性能的网关层,保护了脆弱的后端服务。
    3. 快速落地: 无需处理复杂的数据迁移,开发周期更短。
  • 劣势:

    1. 网关成为关键瓶颈: 如果网关插件的性能不佳或存在缺陷,将影响所有流量。
    2. 技术选型关键: 网关插件的开发语言和运行模式直接决定了方案的成败。

经过权衡,我们选择了方案B。它的侵入性更小,风险更可控,能以最小的代价快速解决当前最紧迫的性能问题。这在处理存量系统时,是一个非常务实的工程决策。接下来,焦点就落在了API网关和插件实现技术的选择上。我们最终确定使用Apache APISIX,并采用Rust来编写外部插件(External Plugin)。

  • 为什么是APISIX? 它基于Nginx和LuaJIT,性能极高。更重要的是,它拥有强大的插件生态和灵活的插件运行机制,特别是支持外部插件(External Plugin Runner),允许我们用其他语言(如Java, Go, Python, Rust)来编写插件逻辑,而不必局限于Lua。
  • 为什么是Rust?
    1. 极致性能: Rust提供与C/C++相媲美的运行时性能,且没有GC(垃圾回收)暂停的困扰。对于网关这种需要处理海量请求、对延迟极度敏感的场景,这是至关重要的。
    2. 内存安全: Rust的所有权和借用检查机制在编译期就杜绝了空指针、悬垂指针、数据竞争等内存安全问题。对于一个7x24小时运行的核心网络组件,稳定性压倒一切。
    3. 强大的并发能力: Rust的async/await语法糖和成熟的异步运行时(如Tokio)使得编写高并发网络应用变得简单而高效。

核心实现概览

我们的目标是创建一个APISIX的Rust外部插件,它能拦截特定的API请求,执行以下逻辑:

  1. 登录流程: 对于登录请求(/api/auth/login),插件会将其透传给后端的Ruby应用。当Ruby应用验证成功并返回其自定义的legacy_token后,插件会拦截响应,调用一个内部接口(或直接查询数据库)获取该用户的详细信息和权限,然后生成一个标准的JWT返回给前端。
  2. 业务请求流程: 对于携带JWT的后续业务请求,插件会直接在网关层进行JWT的验签。验签通过后,将JWT中的用户信息(如用户ID)注入到请求头中,再转发给上游的Ruby服务。这样,Ruby服务就无需再处理Token验证,只需从请求头中信任地获取用户信息即可。

下面是这个架构的交互流程图:

sequenceDiagram
    participant Client as Client (MobX SPA)
    participant Gateway as APISIX Gateway
    participant RustPlugin as Rust Plugin Runner
    participant RubyAuth as Legacy Ruby Auth Service

    Client->>+Gateway: POST /api/auth/login (credentials)
    Gateway->>+RubyAuth: Forward /api/auth/login request
    RubyAuth-->>-Gateway: Response with `legacy_token`
    Note over Gateway,RustPlugin: Gateway intercepts response
    Gateway->>+RustPlugin: Process response with `legacy_token`
    RustPlugin->>+RubyAuth: GET /internal/user_info (using `legacy_token`)
    RubyAuth-->>-RustPlugin: User profile & permissions
    RustPlugin-->>-Gateway: Generate new JWT
    Gateway-->>-Client: Respond with standard JWT

    Client->>+Gateway: GET /api/orders (Authorization: Bearer )
    Note over Gateway,RustPlugin: Gateway intercepts request
    Gateway->>+RustPlugin: Verify JWT signature & claims
    RustPlugin-->>-Gateway: Verification OK, inject User-ID header
    Gateway->>+RubyAuth: Forward request with `X-User-ID` header
    RubyAuth-->>-Gateway: Response data
    Gateway-->>-Client: Forward response data

APISIX与Rust插件配置

首先,我们需要在APISIX的配置文件config.yaml中启用外部插件运行器,并指定通信方式。在生产环境中,使用Unix Domain Socket比TCP端口更高效、更安全。

config.yaml

apisix:
  # ... 其他配置
  # enable_real_ip: true

ext-plugin:
  cmd:
    # 插件运行器的启动命令,APISIX会管理这个进程
    - /path/to/your/rust_plugin_runner
  path: /tmp/apisix-rust.sock # 使用Unix Domain Socket

然后,我们创建一个APISIX路由,将这个外部插件绑定到需要保护的API上。

APISIX Route Configuration (via Admin API)

curl -i http://127.0.0.1:9180/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
    "uri": "/api/*",
    "plugins": {
        "ext-plugin-pre-req": {
            "conf": [
                { "name": "rust-auth-plugin", "value": "{\"jwt_secret\":\"your-super-secret-key\"}" }
            ]
        }
    },
    "upstream": {
        "type": "roundrobin",
        "nodes": {
            "ruby-app:3000": 1
        }
    }
}'

这里我们定义了一个名为rust-auth-plugin的插件,并在ext-plugin-pre-req阶段执行它。插件的配置(如JWT密钥)通过value字段以JSON字符串的形式传递。

Rust插件核心代码实现

现在进入最核心的部分:Rust插件的编码。我们使用apisix-rust-plugin-sdk这个crate来简化开发。

Cargo.toml

[package]
name = "apisix-rust-auth-plugin"
version = "0.1.0"
edition = "2021"

[dependencies]
apisix-rust-plugin-sdk = "0.7"
tokio = { version = "1", features = ["full"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
jsonwebtoken = "8"
reqwest = { version = "0.11", features = ["json"] }
chrono = "0.4"
tracing = "0.1"
tracing-subscriber = "0.3"

src/main.rs

use apisix_rust_plugin_sdk::{
    bindings::{apisix_conf_get_str, apisix_resp_set_body, apisix_resp_set_header},
    filter::{Filter, FilterAction, FilterContext, FilterStatus},
    http::{Method, Request},
    log::info,
    utils::yield_now,
    Plugin,
};

use chrono::Utc;
use jsonwebtoken::{decode, encode, DecodingKey, EncodingKey, Header, Validation};
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;

// 定义插件配置结构体
#[derive(Deserialize)]
struct AuthPluginConfig {
    jwt_secret: String,
}

// 定义JWT的Claims
#[derive(Debug, Serialize, Deserialize)]
struct Claims {
    sub: String, // Subject (user ID)
    exp: usize,  // Expiration Time
    roles: Vec<String>,
}

// 插件主体结构
struct AuthPlugin {
    client: Arc<Client>,
}

impl Plugin for AuthPlugin {
    type Conf = AuthPluginConfig;
    type Ctx = ();

    fn new(conf: Self::Conf) -> Self {
        // 在生产环境中,应该对HTTP Client进行更精细的配置
        // 例如设置连接池大小、超时等
        let client = Arc::new(Client::new());
        Self { client }
    }

    fn name() -> &'static str {
        "rust-auth-plugin"
    }
}

// 实现Filter trait来处理请求
#[async_trait::async_trait]
impl Filter for AuthPlugin {
    async fn on_http_request(&mut self, conf: &AuthPluginConfig, ctx: &mut FilterContext) -> FilterAction {
        let req = ctx.req();
        let path = req.path();

        // 登录请求,直接放行到后端Ruby应用
        if path == "/api/auth/login" && req.method() == Method::POST {
            info!("Login request detected, passing through.");
            return FilterAction::Continue;
        }

        // 非登录请求,验证JWT
        let auth_header = match req.header("Authorization") {
            Some(h) => h,
            None => {
                // 生产级的错误处理应该返回结构化的JSON
                ctx.resp()
                    .set_status_code(401)
                    .set_body("Missing Authorization Header".as_bytes().to_vec());
                return FilterAction::Stop;
            }
        };

        if !auth_header.starts_with("Bearer ") {
            ctx.resp()
                .set_status_code(401)
                .set_body("Invalid token format".as_bytes().to_vec());
            return FilterAction::Stop;
        }

        let token = &auth_header[7..];
        let decoding_key = DecodingKey::from_secret(conf.jwt_secret.as_ref());
        let validation = Validation::default();

        match decode::<Claims>(token, &decoding_key, &validation) {
            Ok(token_data) => {
                // 验证成功,将用户信息注入请求头
                ctx.req_mut()
                    .headers_mut()
                    .insert("X-User-ID", token_data.claims.sub.as_str());

                ctx.req_mut()
                    .headers_mut()
                    .insert("X-User-Roles", token_data.claims.roles.join(",").as_str());

                FilterAction::Continue
            }
            Err(e) => {
                info!("JWT validation failed: {}", e);
                ctx.resp()
                    .set_status_code(401)
                    .set_body("Invalid or expired token".as_bytes().to_vec());
                FilterAction::Stop
            }
        }
    }

    // 在响应阶段处理登录成功后的Token转换
    async fn on_http_response(&mut self, conf: &AuthPluginConfig, ctx: &mut FilterContext) -> FilterStatus {
        let req = ctx.req();
        let path = req.path();
        let resp_status = ctx.resp().status_code();

        // 仅当是登录请求且后端响应成功时才处理
        if path == "/api/auth/login" && resp_status >= 200 && resp_status < 300 {
            info!("Intercepting successful login response.");

            let body = ctx.resp().body();
            let legacy_data: Result<HashMap<String, String>, _> = serde_json::from_slice(&body);

            if let Ok(data) = legacy_data {
                if let Some(legacy_token) = data.get("legacy_token") {
                    // 这里的错误处理至关重要
                    // 如果获取用户信息失败,不能返回成功,必须向客户端返回错误
                    match self.fetch_user_info(legacy_token).await {
                        Ok((user_id, roles)) => {
                            let expiration = Utc::now()
                                .checked_add_signed(chrono::Duration::hours(24))
                                .expect("valid timestamp")
                                .timestamp();

                            let claims = Claims {
                                sub: user_id,
                                exp: expiration as usize,
                                roles,
                            };

                            let header = Header::default();
                            let encoding_key = EncodingKey::from_secret(conf.jwt_secret.as_ref());

                            match encode(&header, &claims, &encoding_key) {
                                Ok(jwt) => {
                                    let new_body = serde_json::json!({ "jwt_token": jwt });
                                    ctx.resp().set_status_code(200);
                                    ctx.resp().set_body(new_body.to_string().into_bytes());
                                }
                                Err(_) => {
                                    // JWT 编码失败是服务器内部错误
                                    ctx.resp().set_status_code(500);
                                    ctx.resp().set_body("Failed to generate token".as_bytes().to_vec());
                                }
                            }
                        }
                        Err(e) => {
                            info!("Failed to fetch user info: {}", e);
                            ctx.resp().set_status_code(503); // Service Unavailable
                            ctx.resp().set_body("Auth backend service is unavailable".as_bytes().to_vec());
                        }
                    }
                }
            }
        }
        FilterStatus::Done
    }
}

// 模拟向Ruby后端请求用户信息
// 在真实项目中,这可能是一个内部RPC调用或对特定API端点的请求
async fn fetch_user_info(&self, legacy_token: &str) -> Result<(String, Vec<String>), reqwest::Error> {
    info!("Fetching user info with legacy token");
    let user_info_url = "http://ruby-app:3000/internal/user_info";
    
    // 这里的HTTP客户端应该从插件实例中获取,以复用连接池
    let response = self.client
        .get(user_info_url)
        .header("X-Legacy-Token", legacy_token)
        .send()
        .await?;

    if response.status().is_success() {
        let user_data: HashMap<String, serde_json::Value> = response.json().await?;
        let user_id = user_data.get("id").and_then(|v| v.as_str()).unwrap_or_default().to_string();
        let roles = user_data.get("roles")
            .and_then(|v| v.as_array())
            .map(|arr| arr.iter().map(|r| r.as_str().unwrap_or_default().to_string()).collect())
            .unwrap_or_default();
            
        Ok((user_id, roles))
    } else {
        // 将HTTP错误转换为一个可处理的错误类型
        Err(response.error_for_status().unwrap_err())
    }
}

// 插件注册
#[no_mangle]
pub extern "C" fn apisix_plugin_init() {
    AuthPlugin::register();
}

这段代码展示了插件的核心逻辑,包括请求/响应阶段的挂载点、JWT的生成与验证、与后端服务的异步HTTP通信,以及关键的错误处理。一个常见的错误是在fetch_user_info失败时未正确处理,导致向客户端返回不一致的状态。这里的实现确保了在后端依赖服务不可用时,会返回503 Service Unavailable,这对于问题的快速定位至关重要。

前端状态管理与工具链

在前端,我们使用MobX来管理认证状态。有了标准的JWT后,状态管理变得非常清晰。

// stores/AuthStore.js
import { makeAutoObservable, runInAction } from 'mobx';
import axios from 'axios';

class AuthStore {
    jwt = localStorage.getItem('jwt_token') || null;
    userInfo = null;
    isAuthenticated = false;
    error = null;

    constructor() {
        makeAutoObservable(this);
        this.checkAuth();
    }

    async login(credentials) {
        try {
            const response = await axios.post('/api/auth/login', credentials);
            const { jwt_token } = response.data;
            
            runInAction(() => {
                this.jwt = jwt_token;
                this.isAuthenticated = true;
                this.error = null;
                localStorage.setItem('jwt_token', jwt_token);
                // 解码JWT获取用户信息,或发起另一个请求
                // const decoded = jwt_decode(jwt_token);
                // this.userInfo = { id: decoded.sub, roles: decoded.roles };
            });
        } catch (err) {
            runInAction(() => {
                this.error = 'Login failed';
                this.isAuthenticated = false;
            });
        }
    }

    logout() {
        this.jwt = null;
        this.userInfo = null;
        this.isAuthenticated = false;
        localStorage.removeItem('jwt_token');
    }
    
    checkAuth() {
        if (this.jwt) {
            // 在生产应用中,还需要验证JWT是否过期
            this.isAuthenticated = true;
        }
    }
}

export const authStore = new AuthStore();

这份MobX store的代码清晰地管理了用户的认证状态和JWT。同时,为了保证大型前端项目的代码质量和一致性,我们引入了Rome作为统一的工具链,它集成了linter、formatter、compiler等功能。在rome.json中配置规则,可以强制团队遵循一致的代码风格和最佳实践,这在多人协作中减少了不必要的代码审查开销。

架构的局限性与未来演进

这个基于APISIX和Rust插件的方案,有效地解决了当前迫在眉睫的性能问题,并实现了对后端服务的无侵入改造。但它并非银弹,依然存在其适用边界和局限性。

首先,Rust外部插件进程本身成为了一个新的关键运维节点。虽然它性能极高,但也需要配套的监控、日志和告警体系,以确保其稳定运行。APISIX虽然会管理其生命周期,但在极端情况下,插件进程的崩溃或无响应仍可能影响服务。

其次,当前的认证流程依然依赖于后端Ruby服务的 /internal/user_info 接口,这在逻辑上并未完全解耦。如果该接口出现故障,新的登录请求将全部失败。这只是将性能瓶颈转换为了一个高可用的依赖关系。

未来的演进路径是明确的:逐步将用户数据和认证逻辑从Ruby monolith中迁移出来。可以先通过CDC(Change Data Capture)技术将用户数据实时同步到一个新的、独立的数据库中。当数据同步稳定后,Rust插件便可以直接查询新数据库,彻底摆脱对旧Ruby应用的运行时依赖。最终,当所有相关业务都迁移完成后,旧的认证模块就可以安全下线。我们当前的架构,正是在为这个最终目标铺平道路,它是一个务实且有效的过渡态。


  目录