AI 学习笔记(五):把 OpenAI 最小示例升级成可复用服务层,补上重试、日志、限流和错误分类

上一篇我们已经跑通了一个最小结构化输出示例:请求发出去,模型按 JSON Schema 返回结果。
这一步很重要,但离真实项目还差一层东西: 可复用的服务层封装

如果你直接把示例代码复制进业务,后面通常会很快遇到这些问题:

  • 每个调用点都在重复初始化 client
  • 超时、重试、日志、错误处理到处散落
  • 一旦出现 429、网络抖动、服务端波动,排查会非常痛苦
  • 调用量上来后,没有基本限流,稳定性会越来越差

所以这一篇不再讲“怎么调一次 API”,而是讲:怎么把最小示例收口成一个项目里能长期复用的 OpenAI 服务层

1. 先把目标定清楚

我希望这层封装至少解决 5 件事:

  1. 统一入口:业务代码不要直接散落调用 SDK。
  2. 超时和重试:把基础稳定性配置收敛到一处。
  3. 日志可观测:至少能看到 request id、耗时、token 和剩余额度。
  4. 基础限流:别在本地批量调用时把自己打进 429。
  5. 错误分类:区分鉴权错误、限流错误、服务端错误、网络错误。

有了这层,后面无论你接结构化输出、普通文本生成,还是做多模型封装,都会轻很多。

2. 先装依赖

示例基于 Node.js 20+ 和 OpenAI 官方 Node SDK:

1
npm i openai

先提醒一点:
官方 SDK 本身已经支持 timeoutmaxRetriesAPIErrorrequest_idwithResponse()
所以这篇不是重复造轮子,而是把这些能力收拢成一个稳定入口。

3. 一个可直接复用的最小服务层

下面这份代码做了几件事:

  • 用统一类封装所有 OpenAI 调用
  • 用简单时间窗做进程内限流
  • 统一日志字段
  • 统一错误分类
  • 保留 validate 钩子,方便后面接 zod / ajv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
import OpenAI from 'openai';

const sleep = (ms) => new Promise((resolve) => setTimeout(resolve, ms));

class SimpleRateLimiter {
constructor({ minIntervalMs = 800 } = {}) {
this.minIntervalMs = minIntervalMs;
this.nextAvailableAt = 0;
}

async wait() {
const now = Date.now();
const waitMs = Math.max(0, this.nextAvailableAt - now);

this.nextAvailableAt = Math.max(now, this.nextAvailableAt) + this.minIntervalMs;

if (waitMs > 0) {
await sleep(waitMs);
}
}
}

function classifyOpenAIError(error) {
if (error instanceof SyntaxError) {
return {
type: 'parse_error',
retryable: false,
message: '模型输出不是有效 JSON',
};
}

if (error instanceof OpenAI.APIError) {
if (error.status === 401) {
return {
type: 'auth_error',
retryable: false,
message: 'API Key 无效或权限不足',
};
}

if (error.status === 429) {
return {
type: 'rate_limit',
retryable: true,
message: '请求过快或额度受限',
};
}

if (error.status >= 500) {
return {
type: 'server_error',
retryable: true,
message: 'OpenAI 服务端异常',
};
}

return {
type: 'api_error',
retryable: false,
message: error.message,
};
}

if (error?.name === 'APIConnectionTimeoutError') {
return {
type: 'timeout',
retryable: true,
message: '请求超时',
};
}

if (error?.name === 'APIConnectionError') {
return {
type: 'network_error',
retryable: true,
message: '网络连接异常',
};
}

return {
type: 'unknown_error',
retryable: false,
message: error?.message || '未知错误',
};
}

export class OpenAIService {
constructor({
apiKey = process.env.OPENAI_API_KEY,
model = process.env.OPENAI_MODEL || 'gpt-5.2',
timeout = 30_000,
maxRetries = 2,
minIntervalMs = 800,
} = {}) {
this.model = model;
this.client = new OpenAI({
apiKey,
timeout,
maxRetries,
});
this.rateLimiter = new SimpleRateLimiter({ minIntervalMs });
}

async requestJson({
name,
schema,
instructions,
input,
validate,
}) {
await this.rateLimiter.wait();

const startedAt = Date.now();

try {
const { data, response } = await this.client.responses
.create({
model: this.model,
input: [
{ role: 'system', content: instructions },
{ role: 'user', content: input },
],
text: {
format: {
type: 'json_schema',
name,
schema,
strict: true,
},
},
})
.withResponse();

const rawText = data.output_text || '{}';
const parsed = JSON.parse(rawText);
const finalData = validate ? validate(parsed) : parsed;

const meta = {
requestId: data._request_id || response.headers.get('x-request-id'),
durationMs: Date.now() - startedAt,
totalTokens: data.usage?.total_tokens ?? 0,
remainingRequests: response.headers.get('x-ratelimit-remaining-requests'),
remainingTokens: response.headers.get('x-ratelimit-remaining-tokens'),
};

console.info('[openai.success]', meta);

return {
data: finalData,
meta,
};
} catch (error) {
const detail = classifyOpenAIError(error);

console.error('[openai.error]', {
...detail,
requestId: error?.request_id,
durationMs: Date.now() - startedAt,
});

throw Object.assign(new Error(detail.message), {
type: detail.type,
retryable: detail.retryable,
cause: error,
});
}
}
}

4. 业务里怎么调用

为了和上一篇保持连续,这里继续用“生成任务拆解清单”的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import { OpenAIService } from './openai-service.js';

const taskPlanSchema = {
type: 'object',
additionalProperties: false,
properties: {
summary: { type: 'string' },
tasks: {
type: 'array',
items: {
type: 'object',
additionalProperties: false,
properties: {
title: { type: 'string' },
priority: { type: 'string', enum: ['high', 'medium', 'low'] },
},
required: ['title', 'priority'],
},
},
},
required: ['summary', 'tasks'],
};

const service = new OpenAIService({
model: process.env.OPENAI_MODEL || 'gpt-5.2',
timeout: 30_000,
maxRetries: 2,
minIntervalMs: 800,
});

const result = await service.requestJson({
name: 'task_plan',
schema: taskPlanSchema,
instructions: '你是资深技术编辑,输出必须严格符合 JSON Schema。',
input: `
我在维护一个 Hexo 博客,需要新增一篇 AI 学习笔记,
要求包括:选题、示例代码、发布检查清单。
`,
});

console.log(result.data);
console.log(result.meta);

返回值里通常最值得保留的是两部分:

  • data:给业务继续使用
  • meta:给日志、监控和排查使用

这就是“可复用服务层”和“只会调一次 API”的本质差别。

5. 这层封装到底解决了什么

5.1 重试不再到处手写

官方 SDK 已经能对连接问题、超时、4084094295xx 做基础重试。
所以大多数项目里,先把 maxRetries 收口到服务层就够用了。

如果你还要自己额外包一层指数退避,记得先想清楚:
不要把 SDK 内建重试和你自己的外层重试叠太狠,不然一次失败可能被放大成很多次请求。

5.2 日志终于能看懂

很多人接 API 时只在失败时 console.error(error)
这对线上排查基本没用。

至少把这些字段打出来:

  • requestId
  • durationMs
  • totalTokens
  • remainingRequests
  • remainingTokens

这样你在定位“慢在哪里”“为什么频繁 429”“是不是某次请求异常贵”时,才有抓手。

5.3 错误处理从“全都 catch”变成“按类型处理”

把错误分层后,业务侧就能更明确地决定动作:

  • auth_error:直接报警,不要重试
  • rate_limit:延后重试或降速
  • server_error / network_error / timeout:可以进入重试队列
  • parse_error:重点排查 prompt、schema 和本地校验

这一步会直接影响你的自动化流程稳定性。

5.4 限流先做轻量版就够了

这里的 SimpleRateLimiter 只是一个进程内最小限流器,适合本地脚本、小型服务或单实例任务。

如果你后面是:

  • 多实例部署
  • 队列消费
  • 高并发批处理

那就应该升级成更正式的方案,比如基于 Redis 的令牌桶、队列串行化,或者按租户做配额控制。

先有最小限流,再谈复杂限流,通常更符合工程节奏。

6. 落地时再补三件事

这篇先把“服务层稳定性”收好,真要上线,我还建议补三件事:

  1. 本地 schema 校验
    不要只信模型返回,继续用 zodajv 或已有校验器兜一次。

  2. 敏感日志脱敏
    日志里不要直接落完整用户输入、密钥、隐私字段。

  3. 统一业务降级
    当模型调用失败时,要定义默认返回、重试上限和告警策略,不要把异常直接甩到前端页面。

总结

从“跑通一次 API”到“接进真实项目”,中间最关键的一步,就是补一层可复用服务层。

这一层的价值不在于代码多高级,而在于它帮你把下面这些问题提前收口:

  • 配置散落
  • 错误失控
  • 日志不可追
  • 调用无节制

一句话总结:
模型能力决定上限,服务层封装决定你能不能稳定落地。

下一篇我会继续沿着 Phase 2 往前走:
把单一 OpenAI 封装推进到多 provider 接入,开始对比 OpenAI、Claude、DeepSeek 的接入差异。

参考资料

本文永久链接: https://www.mulianju.com/learning-notes/ai-learning-notes-openai-service-wrapper/