跳至主要内容

无服务器驱动

Prisma Postgres Serverless 驱动 (@prisma/ppg) 是一个轻量级的客户端,用于使用原生 SQL 连接到 Prisma Postgres。它使用 HTTP 和 WebSocket 协议而不是传统的 TCP 连接,从而可以在无法运行原生 PostgreSQL 驱动的受限环境中访问数据库。

警告

Prisma Postgres Serverless 驱动目前处于早期访问阶段,不建议用于生产环境。

主要功能

Serverless 驱动使用 HTTP 和 WebSocket 协议代替 TCP,从而可以在无法运行传统 PostgreSQL 驱动的环境中访问数据库。

  • 兼容 Cloudflare Workers、Vercel Edge Functions、Deno Deploy、AWS Lambda、Bun 和浏览器
  • 逐行传输结果,以恒定的内存使用量处理大型数据集
  • 通过单个连接管道传输多个查询,将延迟降低高达 3 倍
  • 具有自动参数化和完整 TypeScript 支持的 SQL 模板字面量
  • 内置事务、批处理操作和可扩展的类型系统
  • 所有可用的 Prisma Postgres 区域之间自动连接池,以获得最佳性能

此驱动适用于没有完整 Node.js 支持的边缘/无服务器环境,或者在处理需要流式传输的大型结果集时。

对于标准 Node.js 环境,请使用node-postgres 驱动,以通过直接 TCP 连接获得更低的延迟。

先决条件:获取连接字符串

Serverless 驱动需要 Prisma Postgres Direct TCP 连接 URL。

postgres://identifier:key@db.prisma.io:5432/postgres?sslmode=require

在您的 API Keys 部分中查找此项。连接字符串仅用于提取身份验证凭据。客户端不会建立直接的 TCP 连接。

如果您没有 Prisma Postgres 数据库,请使用create-db CLI 工具创建一个。

npx prisma create-db

安装

根据您的用例安装相应的软件包

npm install @prisma/ppg 

用法

使用 SQL 模板字面量进行查询

使用带有 SQL 模板字面量和自动参数化的 prismaPostgres() 高级 API

import { prismaPostgres, defaultClientConfig } from "@prisma/ppg"

const ppg = prismaPostgres(
defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!)
)

type User = { id: number; name: string; email: string }

// SQL template literals with automatic parameterization
const users = await ppg.sql<User>`
SELECT * FROM users WHERE email = ${'user@example.com'}
`.collect()

console.log(users[0].name)

与 Prisma ORM 一起使用

使用 PrismaPostgresAdapter 通过无服务器驱动连接 Prisma Client

import { PrismaClient } from '../generated/prisma/client'
import { PrismaPostgresAdapter } from '@prisma/adapter-ppg'

const prisma = new PrismaClient({
adapter: new PrismaPostgresAdapter({
connectionString: process.env.PRISMA_DIRECT_TCP_URL,
}),
})

const users = await prisma.user.findMany()

流式传输结果

结果以 CollectableIterator 的形式返回。逐行流式传输以实现恒定的内存使用,或将所有行收集到一个数组中

type User = { id: number; name: string; email: string }

// Stream rows one at a time (constant memory usage)
for await (const user of ppg.sql<User>`SELECT * FROM users`) {
console.log(user.name)
}

// Or collect all rows into an array
const allUsers = await ppg.sql<User>`SELECT * FROM users`.collect()

管道查询

通过单个 WebSocket 连接发送多个查询,无需等待响应。查询立即发送,结果以 FIFO 顺序到达

import { client, defaultClientConfig } from "@prisma/ppg"

const cl = client(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))
const session = await cl.newSession()

// Send all queries immediately (pipelined)
const [usersResult, ordersResult, productsResult] = await Promise.all([
session.query("SELECT * FROM users"),
session.query("SELECT * FROM orders"),
session.query("SELECT * FROM products")
])

session.close()

在 100 毫秒的网络延迟下,3 个顺序查询需要 300 毫秒(3 x RTT),而管道查询仅需 100 毫秒(1 x RTT)。

参数流式传输

超过 1KB 的参数会自动流式传输,而无需在内存中缓冲。对于大型二进制参数,您必须使用 boundedByteStreamParameter(),它会创建一个 BoundedByteStreamParameter 对象,该对象携带 PostgreSQL 协议所需的总字节大小

import { client, defaultClientConfig, boundedByteStreamParameter, BINARY } from "@prisma/ppg"

const cl = client(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))

// Large binary data (e.g., file content)
const stream = getReadableStream() // Your ReadableStream source
const totalSize = 1024 * 1024 // Total size must be known in advance

// Create a bounded byte stream parameter
const streamParam = boundedByteStreamParameter(stream, BINARY, totalSize)

// Automatically streamed - constant memory usage
await cl.query("INSERT INTO files (data) VALUES ($1)", streamParam)

对于 Uint8Array 数据,请使用 byteArrayParameter()

import { client, defaultClientConfig, byteArrayParameter, BINARY } from "@prisma/ppg"

const cl = client(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))

const bytes = new Uint8Array([1, 2, 3, 4])
const param = byteArrayParameter(bytes, BINARY)

await cl.query("INSERT INTO files (data) VALUES ($1)", param)

boundedByteStreamParameter() 函数由 @prisma/ppg 库提供,由于 PostgreSQL 协议要求,需要提前知道总字节大小。

事务和批处理操作

事务自动处理 BEGIN、COMMIT 和 ROLLBACK

const result = await ppg.transaction(async (tx) => {
await tx.sql.exec`INSERT INTO users (name) VALUES ('Alice')`
const users = await tx.sql<User>`SELECT * FROM users WHERE name = 'Alice'`.collect()
return users[0].name
})

批处理操作在自动事务中单次往返执行多个语句

const [users, affected] = await ppg.batch<[User[], number]>(
{ query: "SELECT * FROM users WHERE id < $1", parameters: [5] },
{ exec: "INSERT INTO users (name) VALUES ($1)", parameters: ["Charlie"] }
)

类型处理

使用 defaultClientConfig() 时,常见的 PostgreSQL 类型会自动解析(booleanint2int4int8float4float8textvarcharjsonjsonbdatetimestamptimestamptz

import { prismaPostgres, defaultClientConfig } from "@prisma/ppg"

const ppg = prismaPostgres(defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!))

// JSON/JSONB automatically parsed
const rows = await ppg.sql<{ data: { key: string } }>`
SELECT '{"key": "value"}'::jsonb as data
`.collect()
console.log(rows[0].data.key) // "value"

// BigInt parsed to JavaScript BigInt
const bigints = await ppg.sql<{ big: bigint }>`SELECT 9007199254740991::int8 as big`.collect()

// Dates parsed to Date objects
const dates = await ppg.sql<{ created: Date }>`SELECT NOW() as created`.collect()

自定义解析器和序列化器

使用自定义解析器(按 PostgreSQL OID)和序列化器(按类型保护)扩展或覆盖类型系统

import { client, defaultClientConfig } from "@prisma/ppg"
import type { ValueParser } from "@prisma/ppg"

// Custom parser for UUID type
const uuidParser: ValueParser<string> = {
oid: 2950,
parse: (value) => value ? value.toUpperCase() : null
}

const config = defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!)
const cl = client({
...config,
parsers: [...config.parsers, uuidParser] // Append to defaults
})

对于自定义序列化器,请将其置于默认值之前,以便它们优先

import type { ValueSerializer } from "@prisma/ppg"

class Point { constructor(public x: number, public y: number) {} }

const pointSerializer: ValueSerializer<Point> = {
supports: (value: unknown): value is Point => value instanceof Point,
serialize: (value: Point) => `(${value.x},${value.y})`
}

const config = defaultClientConfig(process.env.PRISMA_DIRECT_TCP_URL!)
const cl = client({
...config,
serializers: [pointSerializer, ...config.serializers] // Your serializer first
})

await cl.query("INSERT INTO locations (point) VALUES ($1)", new Point(10, 20))

有关更多详细信息,请参阅npm 包文档

平台兼容性

该驱动程序适用于任何具有 fetchWebSocket API 的环境

平台HTTP 传输WebSocket 传输
Cloudflare Workers
Vercel Edge Functions
AWS Lambda
Deno Deploy
Bun
Node.js 18+
浏览器✅(带 CORS)

传输模式

  • HTTP 传输(无状态): 每个查询都是一个独立的 HTTP 请求。最适合简单查询和边缘函数。
  • WebSocket 传输(有状态): 用于多路复用查询的持久连接。最适合事务、管道和多个查询。使用 client().newSession() 创建会话。

API 概览

prismaPostgres(config)

具有 SQL 模板字面量、事务和批处理操作的高级 API。推荐用于大多数用例。

client(config)

具有显式参数传递和会话管理的低级 API。在需要精细控制时使用。

有关完整的 API 文档,请参阅npm 包

错误处理

提供了结构化的错误类型:DatabaseErrorHttpResponseErrorWebSocketErrorValidationError

import { DatabaseError } from "@prisma/ppg"

try {
await ppg.sql`SELECT * FROM invalid_table`.collect()
} catch (error) {
if (error instanceof DatabaseError) {
console.log(error.code)
}
}

默认启用连接池

Serverless 驱动程序自动使用所有可用 Prisma Postgres 区域的连接池,以实现最佳性能和资源利用率。

连接池默认启用,无需额外配置。

这确保了无论您的部署区域如何,都能高效地连接数据库,减少连接开销并提高查询性能。

限制

  • 需要 Prisma Postgres 实例,不适用于本地开发数据库
  • 目前处于早期访问阶段,不建议用于生产环境

了解更多

© . This site is unofficial and not affiliated with Prisma Data, Inc.