无服务器驱动
Prisma Postgres Serverless 驱动 (@prisma/ppg) 是一个轻量级的客户端,用于使用原生 SQL 连接到 Prisma Postgres。它使用 HTTP 和 WebSocket 协议而不是传统的 TCP 连接,从而可以在无法运行原生 PostgreSQL 驱动的受限环境中访问数据库。
Prisma Postgres Serverless 驱动目前处于早期访问阶段,不建议用于生产环境。
主要功能
Serverless 驱动使用 HTTP 和 WebSocket 协议代替 TCP,从而可以在无法运行传统 PostgreSQL 驱动的环境中访问数据库。
- 兼容 Cloudflare Workers、Vercel Edge Functions、Deno Deploy、AWS Lambda、Bun 和浏览器
- 逐行传输结果,以恒定的内存使用量处理大型数据集
- 通过单个连接管道传输多个查询,将延迟降低高达 3 倍
- 具有自动参数化和完整 TypeScript 支持的 SQL 模板字面量
- 内置事务、批处理操作和可扩展的类型系统
- 在所有可用的 Prisma Postgres 区域之间自动连接池,以获得最佳性能
此驱动适用于没有完整 Node.js 支持的边缘/无服务器环境,或者在处理需要流式传输的大型结果集时。
对于标准 Node.js 环境,请使用node-postgres 驱动,以通过直接 TCP 连接获得更低的延迟。
先决条件:获取连接字符串
Serverless 驱动需要 Prisma Postgres Direct TCP 连接 URL。
postgres://identifier:key@db.prisma.io:5432/postgres?sslmode=require
在您的 API Keys 部分中查找此项。连接字符串仅用于提取身份验证凭据。客户端不会建立直接的 TCP 连接。
如果您没有 Prisma Postgres 数据库,请使用create-db CLI 工具创建一个。
npx prisma create-db
安装
根据您的用例安装相应的软件包
- 不使用 Prisma ORM
- 使用 Prisma ORM
npm install @prisma/ppg
npm install @prisma/ppg @prisma/adapter-ppg
用法
使用 SQL 模板字面量进行查询
使用带有 SQL 模板字面量和自动参数化的 prismaPostgres() 高级 API
import { prismaPostgres, defaultClientConfig } from "@prisma/ppg"
const ppg = prismaPostgres(
defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!)
)
type User = { id: number; name: string; email: string }
// SQL template literals with automatic parameterization
const users = await ppg.sql<User>`
SELECT * FROM users WHERE email = ${'user@example.com'}
`.collect()
console.log(users[0].name)
与 Prisma ORM 一起使用
使用 PrismaPostgresAdapter 通过无服务器驱动连接 Prisma Client
import { PrismaClient } from '../generated/prisma/client'
import { PrismaPostgresAdapter } from '@prisma/adapter-ppg'
const prisma = new PrismaClient({
adapter: new PrismaPostgresAdapter({
connectionString: process.env.PRISMA_DIRECT_TCP_URL,
}),
})
const users = await prisma.user.findMany()
流式传输结果
结果以 CollectableIterator 的形式返回。逐行流式传输以实现恒定的内存使用,或将所有行收集到一个数组中
type User = { id: number; name: string; email: string }
// Stream rows one at a time (constant memory usage)
for await (const user of ppg.sql<User>`SELECT * FROM users`) {
console.log(user.name)
}
// Or collect all rows into an array
const allUsers = await ppg.sql<User>`SELECT * FROM users`.collect()
管道查询
通过单个 WebSocket 连接发送多个查询,无需等待响应。查询立即发送,结果以 FIFO 顺序到达
import { client, defaultClientConfig } from "@prisma/ppg"
const cl = client(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))
const session = await cl.newSession()
// Send all queries immediately (pipelined)
const [usersResult, ordersResult, productsResult] = await Promise.all([
session.query("SELECT * FROM users"),
session.query("SELECT * FROM orders"),
session.query("SELECT * FROM products")
])
session.close()
在 100 毫秒的网络延迟下,3 个顺序查询需要 300 毫秒(3 x RTT),而管道查询仅需 100 毫秒(1 x RTT)。
参数流式传输
超过 1KB 的参数会自动流式传输,而无需在内存中缓冲。对于大型二进制参数,您必须使用 boundedByteStreamParameter(),它会创建一个 BoundedByteStreamParameter 对象,该对象携带 PostgreSQL 协议所需的总字节大小
import { client, defaultClientConfig, boundedByteStreamParameter, BINARY } from "@prisma/ppg"
const cl = client(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))
// Large binary data (e.g., file content)
const stream = getReadableStream() // Your ReadableStream source
const totalSize = 1024 * 1024 // Total size must be known in advance
// Create a bounded byte stream parameter
const streamParam = boundedByteStreamParameter(stream, BINARY, totalSize)
// Automatically streamed - constant memory usage
await cl.query("INSERT INTO files (data) VALUES ($1)", streamParam)
对于 Uint8Array 数据,请使用 byteArrayParameter()
import { client, defaultClientConfig, byteArrayParameter, BINARY } from "@prisma/ppg"
const cl = client(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))
const bytes = new Uint8Array([1, 2, 3, 4])
const param = byteArrayParameter(bytes, BINARY)
await cl.query("INSERT INTO files (data) VALUES ($1)", param)
boundedByteStreamParameter() 函数由 @prisma/ppg 库提供,由于 PostgreSQL 协议要求,需要提前知道总字节大小。
事务和批处理操作
事务自动处理 BEGIN、COMMIT 和 ROLLBACK
const result = await ppg.transaction(async (tx) => {
await tx.sql.exec`INSERT INTO users (name) VALUES ('Alice')`
const users = await tx.sql<User>`SELECT * FROM users WHERE name = 'Alice'`.collect()
return users[0].name
})
批处理操作在自动事务中单次往返执行多个语句
const [users, affected] = await ppg.batch<[User[], number]>(
{ query: "SELECT * FROM users WHERE id < $1", parameters: [5] },
{ exec: "INSERT INTO users (name) VALUES ($1)", parameters: ["Charlie"] }
)
类型处理
使用 defaultClientConfig() 时,常见的 PostgreSQL 类型会自动解析(boolean、int2、int4、int8、float4、float8、text、varchar、json、jsonb、date、timestamp、timestamptz)
import { prismaPostgres, defaultClientConfig } from "@prisma/ppg"
const ppg = prismaPostgres(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))
// JSON/JSONB automatically parsed
const rows = await ppg.sql<{ data: { key: string } }>`
SELECT '{"key": "value"}'::jsonb as data
`.collect()
console.log(rows[0].data.key) // "value"
// BigInt parsed to JavaScript BigInt
const bigints = await ppg.sql<{ big: bigint }>`SELECT 9007199254740991::int8 as big`.collect()
// Dates parsed to Date objects
const dates = await ppg.sql<{ created: Date }>`SELECT NOW() as created`.collect()
自定义解析器和序列化器
使用自定义解析器(按 PostgreSQL OID)和序列化器(按类型保护)扩展或覆盖类型系统
import { client, defaultClientConfig } from "@prisma/ppg"
import type { ValueParser } from "@prisma/ppg"
// Custom parser for UUID type
const uuidParser: ValueParser<string> = {
oid: 2950,
parse: (value) => value ? value.toUpperCase() : null
}
const config = defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!)
const cl = client({
...config,
parsers: [...config.parsers, uuidParser] // Append to defaults
})
对于自定义序列化器,请将其置于默认值之前,以便它们优先
import type { ValueSerializer } from "@prisma/ppg"
class Point { constructor(public x: number, public y: number) {} }
const pointSerializer: ValueSerializer<Point> = {
supports: (value: unknown): value is Point => value instanceof Point,
serialize: (value: Point) => `(${value.x},${value.y})`
}
const config = defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!)
const cl = client({
...config,
serializers: [pointSerializer, ...config.serializers] // Your serializer first
})
await cl.query("INSERT INTO locations (point) VALUES ($1)", new Point(10, 20))
有关更多详细信息,请参阅npm 包文档。
平台兼容性
该驱动程序适用于任何具有 fetch 和 WebSocket API 的环境
| 平台 | HTTP 传输 | WebSocket 传输 |
|---|---|---|
| Cloudflare Workers | ✅ | ✅ |
| Vercel Edge Functions | ✅ | ✅ |
| AWS Lambda | ✅ | ✅ |
| Deno Deploy | ✅ | ✅ |
| Bun | ✅ | ✅ |
| Node.js 18+ | ✅ | ✅ |
| 浏览器 | ✅ | ✅(带 CORS) |
传输模式
- HTTP 传输(无状态): 每个查询都是一个独立的 HTTP 请求。最适合简单查询和边缘函数。
- WebSocket 传输(有状态): 用于多路复用查询的持久连接。最适合事务、管道和多个查询。使用
client().newSession()创建会话。
API 概览
prismaPostgres(config)
具有 SQL 模板字面量、事务和批处理操作的高级 API。推荐用于大多数用例。
client(config)
具有显式参数传递和会话管理的低级 API。在需要精细控制时使用。
有关完整的 API 文档,请参阅npm 包。
错误处理
提供了结构化的错误类型:DatabaseError、HttpResponseError、WebSocketError、ValidationError。
import { DatabaseError } from "@prisma/ppg"
try {
await ppg.sql`SELECT * FROM invalid_table`.collect()
} catch (error) {
if (error instanceof DatabaseError) {
console.log(error.code)
}
}
默认启用连接池
Serverless 驱动程序自动使用所有可用 Prisma Postgres 区域的连接池,以实现最佳性能和资源利用率。
连接池默认启用,无需额外配置。
这确保了无论您的部署区域如何,都能高效地连接数据库,减少连接开销并提高查询性能。
限制
- 需要 Prisma Postgres 实例,不适用于本地开发数据库
- 目前处于早期访问阶段,不建议用于生产环境