跳至主要内容

事务和批量查询

数据库事务是指一系列读/写操作,这些操作保证要么整体成功,要么整体失败。本节将介绍 Prisma Client API 支持事务的方式。

事务概览

信息

在 Prisma ORM 版本 4.4.0 之前,无法在事务上设置隔离级别。数据库配置中的隔离级别始终适用。

开发者通过将操作封装在事务中,利用数据库提供的安全保证。这些保证通常用 ACID 缩写来概括

  • 原子性(Atomic):确保事务的所有操作成功或所有操作失败。事务要么成功提交,要么中止回滚
  • 一致性(Consistent):确保事务前后数据库的状态是有效的(即保持了数据上的任何现有不变性)。
  • 隔离性(Isolated):确保并发运行的事务具有与串行运行相同的效果。
  • 持久性(Durability):确保事务成功后,所有写入都持久存储。

尽管这些属性中的每一个都有很多模糊性和细微之处(例如,一致性实际上可以被视为应用层面的责任而不是数据库属性,或者隔离通常通过更强和更弱的隔离级别来保证),但总的来说,它们为开发者在考虑数据库事务时提供了良好的高级期望指南。

“事务是一个抽象层,它允许应用程序假装某些并发问题以及某些硬件和软件故障不存在。一大类错误被简化为简单的事务中止,应用程序只需要重试即可。” Designing Data-Intensive Applications, Martin Kleppmann

Prisma Client 支持六种不同的事务处理方式,适用于三种不同的场景

场景可用技术
依赖写入
  • 嵌套写入
独立写入
  • $transaction([]) API
  • 批量操作
读取、修改、写入
  • 幂等操作
  • 乐观并发控制
  • 交互式事务

您选择的技术取决于您的具体用例。

注意:为了本指南的目的,写入数据库包括创建、更新和删除数据。

关于 Prisma Client 中的事务

Prisma Client 提供以下选项来使用事务

  • 嵌套写入:使用 Prisma Client API 在同一个事务中处理一个或多个相关记录上的多个操作。
  • 批量/大容量事务:使用 updateManydeleteManycreateMany 批量处理一个或多个操作。
  • Prisma Client 中的 $transaction API
    • 顺序操作:将 Prisma Client 查询数组传递给 $transaction<R>(queries: PrismaPromise<R>[]): Promise<R[]>,在事务中顺序执行。
    • 交互式事务:传递一个可以包含用户代码的函数,包括 Prisma Client 查询、非 Prisma 代码和其他控制流,以在事务中执行,使用 $transaction<R>(fn: (prisma: PrismaClient) => R, options?: object): R

嵌套写入

嵌套写入允许您使用单个 Prisma Client API 调用执行多个操作,这些操作涉及多个相关记录。例如,创建用户以及帖子,或者更新订单以及发票。Prisma Client 确保所有操作要么整体成功,要么整体失败。

以下示例演示了使用 create 的嵌套写入

// Create a new user with two posts in a
// single transaction
const newUser: User = await prisma.user.create({
data: {
email: 'alice@prisma.io',
posts: {
create: [
{ title: 'Join the Prisma Discord at https://pris.ly/discord' },
{ title: 'Follow @prisma on Twitter' },
],
},
},
})

以下示例演示了使用 update 的嵌套写入

// Change the author of a post in a single transaction
const updatedPost: Post = await prisma.post.update({
where: { id: 42 },
data: {
author: {
connect: { email: 'alice@prisma.io' },
},
},
})

批量/大容量操作

以下批量操作作为事务运行

  • createMany()
  • createManyAndReturn()
  • updateMany()
  • updateManyAndReturn()
  • deleteMany()

有关更多示例,请参阅批量操作部分。

$transaction API

$transaction API 可以通过两种方式使用

  • 顺序操作:传递一个 Prisma Client 查询数组,以在事务中顺序执行。

    $transaction<R>(queries: PrismaPromise<R>[]): Promise<R[]>

  • 交互式事务:传递一个函数,该函数可以包含用户代码,包括 Prisma Client 查询、非 Prisma 代码和其他控制流,以在事务中执行。

    $transaction<R>(fn: (prisma: PrismaClient) => R): R

顺序 Prisma Client 操作

以下查询返回与所提供过滤器匹配的所有帖子以及所有帖子的计数

const [posts, totalPosts] = await prisma.$transaction([
prisma.post.findMany({ where: { title: { contains: 'prisma' } } }),
prisma.post.count(),
])

您也可以在 $transaction 中使用原始查询

import { selectUserTitles, updateUserName } from '@prisma/client/sql'

const [userList, updateUser] = await prisma.$transaction([
prisma.$queryRawTyped(selectUserTitles()),
prisma.$queryRawTyped(updateUserName(2)),
])

不是在执行每个操作时立即等待结果,而是将操作本身首先存储在一个变量中,然后通过名为 $transaction 的方法提交到数据库。Prisma Client 将确保所有三个 create 操作要么都成功,要么都失败。

注意:操作按照它们在事务中放置的顺序执行。在事务中使用查询不影响查询本身的操作顺序。

有关更多示例,请参阅事务 API 部分。

从版本 4.4.0 开始,顺序操作事务 API 有第二个参数。您可以在此参数中使用以下可选配置选项

  • isolationLevel:设置事务隔离级别。默认情况下,此值设置为数据库中当前配置的值。

例如

await prisma.$transaction(
[
prisma.resource.deleteMany({ where: { name: 'name' } }),
prisma.resource.createMany({ data }),
],
{
isolationLevel: Prisma.TransactionIsolationLevel.Serializable, // optional, default defined by database configuration
}
)

交互式事务

概述

有时您需要对事务中执行的查询进行更多控制。交互式事务旨在为您提供一个应急出口。

信息

交互式事务已从版本 4.7.0 开始全面可用。

如果您在版本 2.29.0 到 4.6.1(包括)之间使用预览版交互式事务,则需要将 interactiveTransactions 预览功能添加到 Prisma schema 的生成器块中。

要使用交互式事务,您可以将异步函数传递给$transaction

传递给此异步函数的第一个参数是 Prisma Client 实例。下面,我们将此实例称为 tx。在此 tx 实例上调用的任何 Prisma Client 调用都封装在事务中。

警告

谨慎使用交互式事务。长时间保持事务打开会损害数据库性能,甚至可能导致死锁。尽量避免在事务函数中执行网络请求和慢查询。我们建议您尽快进出!

示例

让我们看一个例子

想象一下您正在构建一个在线银行系统。要执行的操作之一是将钱从一个人发送到另一个人。

作为经验丰富的开发者,我们希望确保在转账过程中,

  • 金额不会消失
  • 金额不会翻倍

这是一个非常适合交互式事务的用例,因为我们需要在写入之间执行逻辑来检查余额。

在下面的示例中,爱丽丝和鲍勃的账户中各有 100 美元。如果他们尝试发送超过他们拥有的钱,转账将被拒绝。

爱丽丝应该能够进行 1 次 100 美元的转账,而另一次转账将被拒绝。这将导致爱丽丝拥有 0 美元,鲍勃拥有 200 美元。

import { PrismaClient } from '../prisma/generated/client'
const prisma = new PrismaClient()

function transfer(from: string, to: string, amount: number) {
return prisma.$transaction(async (tx) => {
// 1. Decrement amount from the sender.
const sender = await tx.account.update({
data: {
balance: {
decrement: amount,
},
},
where: {
email: from,
},
})

// 2. Verify that the sender's balance didn't go below zero.
if (sender.balance < 0) {
throw new Error(`${from} doesn't have enough to send ${amount}`)
}

// 3. Increment the recipient's balance by amount
const recipient = await tx.account.update({
data: {
balance: {
increment: amount,
},
},
where: {
email: to,
},
})

return recipient
})
}

async function main() {
// This transfer is successful
await transfer('alice@prisma.io', 'bob@prisma.io', 100)
// This transfer fails because Alice doesn't have enough funds in her account
await transfer('alice@prisma.io', 'bob@prisma.io', 100)
}

main()

在上面的示例中,两个 update 查询都在数据库事务中运行。当应用程序到达函数末尾时,事务将提交到数据库。

如果您的应用程序在此过程中遇到错误,异步函数将抛出异常并自动回滚事务。

要捕获异常,您可以将 $transaction 包装在 try-catch 块中

try {
await prisma.$transaction(async (tx) => {
// Code running in a transaction...
})
} catch (err) {
// Handle the rollback...
}

事务选项

事务 API 有第二个参数。对于交互式事务,您可以在此参数中使用以下可选配置选项

  • maxWait:Prisma Client 等待从数据库获取事务的最长时间。默认值为 2 秒。
  • timeout:交互式事务在被取消和回滚之前可以运行的最长时间。默认值为 5 秒。
  • isolationLevel:设置事务隔离级别。默认情况下,此值设置为数据库中当前配置的值。

例如

await prisma.$transaction(
async (tx) => {
// Code running in a transaction...
},
{
maxWait: 5000, // default: 2000
timeout: 10000, // default: 5000
isolationLevel: Prisma.TransactionIsolationLevel.Serializable, // optional, default defined by database configuration
}
)

您也可以在构造函数级别全局设置这些

const prisma = new PrismaClient({
transactionOptions: {
isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
maxWait: 5000, // default: 2000
timeout: 10000, // default: 5000
},
})

事务隔离级别

信息

此功能在 MongoDB 上不可用,因为 MongoDB 不支持隔离级别。

您可以为事务设置事务隔离级别

信息

此功能在以下 Prisma ORM 版本中可用:交互式事务从版本 4.2.0 开始,顺序操作从版本 4.4.0 开始。

在 4.2.0 之前(对于交互式事务)或 4.4.0 之前(对于顺序操作)的版本中,您无法在 Prisma ORM 级别配置事务隔离级别。Prisma ORM 不会明确设置隔离级别,因此使用数据库中配置的隔离级别

设置隔离级别

要设置事务隔离级别,请在 API 的第二个参数中使用 isolationLevel 选项。

对于顺序操作

await prisma.$transaction(
[
// Prisma Client operations running in a transaction...
],
{
isolationLevel: Prisma.TransactionIsolationLevel.Serializable, // optional, default defined by database configuration
}
)

对于交互式事务

await prisma.$transaction(
async (prisma) => {
// Code running in a transaction...
},
{
isolationLevel: Prisma.TransactionIsolationLevel.Serializable, // optional, default defined by database configuration
maxWait: 5000, // default: 2000
timeout: 10000, // default: 5000
}
)

支持的隔离级别

如果底层数据库支持,Prisma Client 支持以下隔离级别

  • 读未提交(ReadUncommitted)
  • 读已提交(ReadCommitted)
  • 可重复读(RepeatableRead)
  • 快照(Snapshot)
  • 串行化(Serializable)

每个数据库连接器可用的隔离级别如下

数据库读未提交(ReadUncommitted)读已提交(ReadCommitted)可重复读(RepeatableRead)快照(Snapshot)串行化(Serializable)
PostgreSQL✔️✔️✔️✔️
MySQL✔️✔️✔️✔️
SQL Server✔️✔️✔️✔️✔️
CockroachDB✔️
SQLite✔️

默认情况下,Prisma Client 将隔离级别设置为数据库中当前配置的值。

每个数据库中默认配置的隔离级别如下

数据库默认值
PostgreSQL读已提交(ReadCommitted)
MySQL可重复读(RepeatableRead)
SQL Server读已提交(ReadCommitted)
CockroachDB串行化(Serializable)
SQLite串行化(Serializable)

数据库特定的隔离级别信息

请参阅以下资源

CockroachDB 和 SQLite 仅支持 Serializable 隔离级别。

事务时序问题

信息
  • 本节中的解决方案不适用于 MongoDB,因为 MongoDB 不支持隔离级别
  • 本节讨论的时序问题不适用于 CockroachDB 和 SQLite,因为这些数据库只支持最高的 Serializable 隔离级别。

当两个或多个事务在某些隔离级别下并发运行时,时序问题可能导致写入冲突或死锁,例如违反唯一约束。例如,考虑以下事件序列,其中事务 A 和事务 B 都尝试执行 deleteManycreateMany 操作

  1. 事务 B:createMany 操作创建一组新行。
  2. 事务 B:应用程序提交事务 B。
  3. 事务 A:createMany 操作。
  4. 事务 A:应用程序提交事务 A。新行与事务 B 在步骤 2 中添加的行发生冲突。

此冲突可能发生在 ReadCommited 隔离级别,这是 PostgreSQL 和 Microsoft SQL Server 中的默认隔离级别。为了避免此问题,您可以设置更高的隔离级别(RepeatableReadSerializable)。您可以在事务上设置隔离级别。这会覆盖该事务的数据库隔离级别。

为了避免事务写入冲突和死锁

  1. 在您的事务上,将 isolationLevel 参数设置为 Prisma.TransactionIsolationLevel.Serializable

    这可确保您的应用程序提交多个并发或并行事务,就像它们是串行运行的一样。当事务因写入冲突或死锁而失败时,Prisma Client 将返回 P2034 错误

  2. 在您的应用程序代码中,在事务周围添加重试以处理任何 P2034 错误,如以下示例所示

    import { Prisma, PrismaClient } from '../prisma/generated/client'

    const prisma = new PrismaClient()
    async function main() {
    const MAX_RETRIES = 5
    let retries = 0

    let result
    while (retries < MAX_RETRIES) {
    try {
    result = await prisma.$transaction(
    [
    prisma.user.deleteMany({
    where: {
    /** args */
    },
    }),
    prisma.post.createMany({
    data: {
    /** args */
    },
    }),
    ],
    {
    isolationLevel: Prisma.TransactionIsolationLevel.Serializable,
    }
    )
    break
    } catch (error) {
    if (error.code === 'P2034') {
    retries++
    continue
    }
    throw error
    }
    }
    }

Promise.all() 中使用 $transaction

如果您将 $transaction 包装在 Promise.all() 调用中,事务内部的查询将串行执行(即一个接一个)

await prisma.$transaction(async (prisma) => {
await Promise.all([
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
prisma.user.findMany(),
])
})

这可能与直觉相反,因为 Promise.all() 通常会并行化传递给它的调用。

这种行为的原因是

  • 一个事务意味着其中的所有查询都必须在同一个连接上运行。
  • 一个数据库连接一次只能执行一个查询。
  • 由于一个查询在工作时会阻塞连接,因此将事务放入 Promise.all 实际上意味着查询应该一个接一个地运行。

依赖写入

如果写入操作满足以下条件,则被视为相互依赖

  • 操作依赖于前一个操作的结果(例如,数据库生成 ID)

最常见的场景是创建一条记录并使用生成的 ID 创建或更新相关记录。示例包括

  • 创建一个用户和两篇相关的博客文章(一对多关系)- 在创建博客文章之前必须知道作者 ID
  • 创建一个团队并分配成员(多对多关系)- 在分配成员之前必须知道团队 ID

依赖写入必须一起成功,以维护数据一致性并防止意外行为,例如没有作者的博客文章或没有成员的团队。

嵌套写入

Prisma Client 解决依赖写入的方法是嵌套写入功能,它受 createupdate 支持。以下嵌套写入创建一个用户和两篇博客文章

const nestedWrite = await prisma.user.create({
data: {
email: 'imani@prisma.io',
posts: {
create: [
{ title: 'My first day at Prisma' },
{ title: 'How to configure a unique constraint in PostgreSQL' },
],
},
},
})

如果任何操作失败,Prisma Client 将回滚整个事务。目前,嵌套写入不支持顶级批量操作,如 client.user.deleteManyclient.user.updateMany

何时使用嵌套写入

如果满足以下条件,请考虑使用嵌套写入:

  • ✔ 您希望同时创建两个或多个通过 ID 相关的记录(例如,创建博客文章和用户)
  • ✔ 您希望同时更新和创建通过 ID 相关的记录(例如,更改用户姓名并创建新的博客文章)

场景:注册流程

考虑 Slack 的注册流程,它

  1. 创建一个团队
  2. 将一个用户添加到该团队,该用户自动成为该团队的管理员

此场景可以通过以下模式表示——请注意,用户可以属于多个团队,团队可以有多个用户(多对多关系)

model Team {
id Int @id @default(autoincrement())
name String
members User[] // Many team members
}

model User {
id Int @id @default(autoincrement())
email String @unique
teams Team[] // Many teams
}

最直接的方法是创建一个团队,然后创建并连接一个用户到该团队

// Create a team
const team = await prisma.team.create({
data: {
name: 'Aurora Adventures',
},
})

// Create a user and assign them to the team
const user = await prisma.user.create({
data: {
email: 'alice@prisma.io',
team: {
connect: {
id: team.id,
},
},
},
})

然而,这段代码有一个问题——考虑以下场景:

  1. 创建团队成功——“Aurora Adventures”现在已被占用
  2. 创建并连接用户失败——团队“Aurora Adventures”存在,但没有用户
  3. 再次执行注册流程并尝试重新创建“Aurora Adventures”失败——团队已存在

创建团队和添加用户应该是一个原子操作,它要么整体成功,要么整体失败

为了在低级数据库客户端中实现原子写入,您必须将插入操作包装在 BEGINCOMMITROLLBACK 语句中。Prisma Client 使用嵌套写入解决了这个问题。以下查询在一个事务中创建团队、创建用户并连接记录

const team = await prisma.team.create({
data: {
name: 'Aurora Adventures',
members: {
create: {
email: 'alice@prisma.io',
},
},
},
})

此外,如果任何时候发生错误,Prisma Client 将回滚整个事务。

嵌套写入常见问题

为什么我不能使用 $transaction([]) API 来解决同样的问题?

$transaction([]) API 不允许您在不同的操作之间传递 ID。在以下示例中,createUserOperation.id 尚不可用

const createUserOperation = prisma.user.create({
data: {
email: 'ebony@prisma.io',
},
})

const createTeamOperation = prisma.team.create({
data: {
name: 'Aurora Adventures',
members: {
connect: {
id: createUserOperation.id, // Not possible, ID not yet available
},
},
},
})

await prisma.$transaction([createUserOperation, createTeamOperation])
嵌套写入支持嵌套更新,但更新不是依赖写入——我应该使用 $transaction([]) API 吗?

正确地说,因为您知道团队的 ID,您可以在 $transaction([]) 中独立更新团队及其团队成员。以下示例在 $transaction([]) 中执行这两个操作

const updateTeam = prisma.team.update({
where: {
id: 1,
},
data: {
name: 'Aurora Adventures Ltd',
},
})

const updateUsers = prisma.user.updateMany({
where: {
teams: {
some: {
id: 1,
},
},
name: {
equals: null,
},
},
data: {
name: 'Unknown User',
},
})

await prisma.$transaction([updateUsers, updateTeam])

然而,您可以使用嵌套写入实现相同的结果

const updateTeam = await prisma.team.update({
where: {
id: 1,
},
data: {
name: 'Aurora Adventures Ltd', // Update team name
members: {
updateMany: {
// Update team members that do not have a name
data: {
name: 'Unknown User',
},
where: {
name: {
equals: null,
},
},
},
},
},
})
我能否执行多个嵌套写入——例如,创建两个新团队并分配用户?

是的,但这结合了场景和技术

  • 创建团队和分配用户是依赖写入——使用嵌套写入
  • 同时创建所有团队和用户是独立写入,因为团队/用户组合 #1 和团队/用户组合 #2 是不相关的写入——使用 $transaction([]) API
// Nested write
const createOne = prisma.team.create({
data: {
name: 'Aurora Adventures',
members: {
create: {
email: 'alice@prisma.io',
},
},
},
})

// Nested write
const createTwo = prisma.team.create({
data: {
name: 'Cool Crew',
members: {
create: {
email: 'elsa@prisma.io',
},
},
},
})

// $transaction([]) API
await prisma.$transaction([createTwo, createOne])

独立写入

如果写入操作不依赖于先前操作的结果,则被视为独立写入。以下几组独立写入可以以任何顺序发生:

  • 将订单列表的状态字段更新为“已发货”
  • 将邮件列表标记为“已读”

注意:如果存在约束,独立写入可能必须按特定顺序发生——例如,如果帖子有一个强制性的 authorId 字段,则必须在博客作者之前删除博客帖子。但是,它们仍然被认为是独立写入,因为没有操作依赖于前一个操作的结果,例如数据库返回生成的 ID。

根据您的要求,Prisma Client 有四种选项来处理应同时成功或失败的独立写入。

批量操作

批量写入允许您在单个事务中写入相同类型的多个记录——如果任何操作失败,Prisma Client 将回滚整个事务。Prisma Client 目前支持

  • createMany()
  • createManyAndReturn()
  • updateMany()
  • updateManyAndReturn()
  • deleteMany()

何时使用批量操作

如果满足以下条件,请考虑使用批量操作作为解决方案:

  • ✔ 您想要更新一批相同类型的记录,例如一批电子邮件

场景:将电子邮件标记为已读

您正在构建一个类似 gmail.com 的服务,您的客户需要一个“标记为已读”功能,允许用户将所有电子邮件标记为已读。对电子邮件状态的每次更新都是独立写入,因为电子邮件彼此不依赖——例如,您阿姨的“生日快乐!🍰”电子邮件与宜家的促销电子邮件无关。

在以下模式中,一个 User 可以有许多收到的电子邮件(一对多关系)

model User {
id Int @id @default(autoincrement())
email String @unique
receivedEmails Email[] // Many emails
}

model Email {
id Int @id @default(autoincrement())
user User @relation(fields: [userId], references: [id])
userId Int
subject String
body String
unread Boolean
}

基于此模式,您可以使用 updateMany 将所有未读电子邮件标记为已读

await prisma.email.updateMany({
where: {
user: {
id: 10,
},
unread: true,
},
data: {
unread: false,
},
})

我能否将嵌套写入与批量操作一起使用?

不能——updateManydeleteMany 目前都不支持嵌套写入。例如,您不能删除多个团队及其所有成员(级联删除)

await prisma.team.deleteMany({
where: {
id: {
in: [2, 99, 2, 11],
},
},
data: {
members: {}, // Cannot access members here
},
})

我能否将批量操作与 $transaction([]) API 一起使用?

是的——例如,您可以在 $transaction([]) 中包含多个 deleteMany 操作。

$transaction([]) API

$transaction([]) API 是解决独立写入的通用解决方案,它允许您将多个操作作为单个原子操作运行——如果任何操作失败,Prisma Client 将回滚整个事务。

值得注意的是,操作按照它们在事务中放置的顺序执行。

await prisma.$transaction([iRunFirst, iRunSecond, iRunThird])

注意:在事务中使用查询不影响查询本身的操作顺序。

随着 Prisma Client 的发展,$transaction([]) API 的用例将越来越多地被更专业的批量操作(如 createMany)和嵌套写入取代。

何时使用 $transaction([]) API

如果满足以下条件,请考虑使用 $transaction([]) API:

  • ✔ 您想要更新包含不同类型记录的批次,例如电子邮件和用户。这些记录不需要以任何方式关联。
  • ✔ 您想要批量执行原始 SQL 查询($executeRaw)——例如,针对 Prisma Client 尚不支持的功能。

场景:隐私立法

GDPR 和其他隐私立法赋予用户要求组织删除所有个人数据的权利。在以下示例模式中,一个 User 可以有许多帖子和私信

model User {
id Int @id @default(autoincrement())
posts Post[]
privateMessages PrivateMessage[]
}

model Post {
id Int @id @default(autoincrement())
user User @relation(fields: [userId], references: [id])
userId Int
title String
content String
}

model PrivateMessage {
id Int @id @default(autoincrement())
user User @relation(fields: [userId], references: [id])
userId Int
message String
}

如果用户行使被遗忘权,我们必须删除三条记录:用户记录、私信和帖子。所有删除操作要么整体成功,要么整体失败,这至关重要,这使其成为事务的用例。但是,在这种情况下,使用单个批量操作(如 deleteMany)是不可能的,因为我们需要跨三个模型删除。相反,我们可以使用 $transaction([]) API 将三个操作一起运行——两个 deleteMany 和一个 delete

const id = 9 // User to be deleted

const deletePosts = prisma.post.deleteMany({
where: {
userId: id,
},
})

const deleteMessages = prisma.privateMessage.deleteMany({
where: {
userId: id,
},
})

const deleteUser = prisma.user.delete({
where: {
id: id,
},
})

await prisma.$transaction([deletePosts, deleteMessages, deleteUser]) // Operations succeed or fail together

场景:预计算 ID 和 $transaction([]) API

$transaction([]) API 不支持依赖写入——如果操作 A 依赖于操作 B 生成的 ID,请使用嵌套写入。但是,如果您预计算 ID(例如,通过生成 GUID),您的写入将变为独立写入。考虑嵌套写入示例中的注册流程

await prisma.team.create({
data: {
name: 'Aurora Adventures',
members: {
create: {
email: 'alice@prisma.io',
},
},
},
})

而不是自动生成 ID,将 TeamUserid 字段更改为 String(如果您不提供值,将自动生成 UUID)。此示例使用 UUID

model Team {
id Int @id @default(autoincrement())
id String @id @default(uuid())
name String
members User[]
}

model User {
id Int @id @default(autoincrement())
id String @id @default(uuid())
email String @unique
teams Team[]
}

重构注册流程示例,以使用 $transaction([]) API 而不是嵌套写入

import { v4 } from 'uuid'

const teamID = v4()
const userID = v4()

await prisma.$transaction([
prisma.user.create({
data: {
id: userID,
email: 'alice@prisma.io',
team: {
id: teamID,
},
},
}),
prisma.team.create({
data: {
id: teamID,
name: 'Aurora Adventures',
},
}),
])

如果喜欢那种语法,技术上仍然可以使用预计算 API 的嵌套写入

import { v4 } from 'uuid'

const teamID = v4()
const userID = v4()

await prisma.team.create({
data: {
id: teamID,
name: 'Aurora Adventures',
members: {
create: {
id: userID,
email: 'alice@prisma.io',
team: {
id: teamID,
},
},
},
},
})

如果您已经在使用自动生成 ID 和嵌套写入,则没有令人信服的理由切换到手动生成 ID 和 $transaction([]) API。

读取、修改、写入

在某些情况下,您可能需要执行自定义逻辑作为原子操作的一部分——也称为读取-修改-写入模式。以下是读取-修改-写入模式的一个示例:

  • 从数据库读取一个值
  • 运行一些逻辑来操作该值(例如,联系外部 API)
  • 将值写回数据库

所有操作都应该同时成功或失败,而不会对数据库进行不必要的更改,但您不一定需要使用实际的数据库事务。本指南的这一部分描述了使用 Prisma Client 和读取-修改-写入模式的两种方法

  • 设计幂等 API
  • 乐观并发控制

幂等 API

幂等性是指使用相同的逻辑和相同的参数多次运行,得到相同结果的能力:对数据库的影响无论是运行一次还是一千次都是相同的。例如

  • 非幂等:在数据库中对电子邮件地址为 "letoya@prisma.io" 的用户执行 upsert(更新或插入)操作。User强制执行唯一的电子邮件地址。如果运行一次(创建一名用户)或运行十次(创建十名用户),对数据库的影响是不同的。
  • 幂等:在数据库中对电子邮件地址为 "letoya@prisma.io" 的用户执行 upsert(更新或插入)操作。User强制执行唯一的电子邮件地址。如果运行一次(创建一名用户)或运行十次(现有用户使用相同的输入进行更新),对数据库的影响是相同的。

幂等性是您应该并且可以在应用程序中尽可能积极设计的东西。

何时设计幂等 API

  • ✔ 您需要能够重试相同的逻辑而不会在数据库中产生不必要的副作用

场景:升级 Slack 团队

您正在为 Slack 创建一个升级流程,允许团队解锁付费功能。团队可以选择不同的计划并按用户、按月付费。您使用 Stripe 作为支付网关,并扩展您的 Team 模型以存储 stripeCustomerId。订阅在 Stripe 中管理。

model Team {
id Int @id @default(autoincrement())
name String
User User[]
stripeCustomerId String?
}

升级流程如下所示

  1. 计算用户数量
  2. 在 Stripe 中创建包含用户数量的订阅
  3. 将团队与 Stripe 客户 ID 相关联以解锁付费功能
const teamId = 9
const planId = 'plan_id'

// Count team members
const numTeammates = await prisma.user.count({
where: {
teams: {
some: {
id: teamId,
},
},
},
})

// Create a customer in Stripe for plan-9454549
const customer = await stripe.customers.create({
externalId: teamId,
plan: planId,
quantity: numTeammates,
})

// Update the team with the customer id to indicate that they are a customer
// and support querying this customer in Stripe from our application code.
await prisma.team.update({
data: {
customerId: customer.id,
},
where: {
id: teamId,
},
})

这个例子有一个问题:您只能运行一次逻辑。考虑以下场景:

  1. Stripe 创建一个新的客户和订阅,并返回一个客户 ID

  2. 更新团队失败——团队未在 Slack 数据库中标记为客户

  3. 客户被 Stripe 收费,但由于团队缺乏有效的 customerId,Slack 中未解锁付费功能

  4. 再次运行相同的代码会

    • 导致错误,因为团队(由 externalId 定义)已存在——Stripe 从不返回客户 ID
    • 如果 externalId 不受唯一约束,Stripe 会创建另一个订阅(非幂等

如果发生错误,您无法重新运行此代码,也无法更改为其他计划而不会被收取两次费用。

以下重构(突出显示)引入了一种机制,该机制检查订阅是否已存在,然后创建描述或更新现有订阅(如果输入相同,则保持不变)

// Calculate the number of users times the cost per user
const numTeammates = await prisma.user.count({
where: {
teams: {
some: {
id: teamId,
},
},
},
})

// Find customer in Stripe
let customer = await stripe.customers.get({ externalId: teamID })

if (customer) {
// If team already exists, update
customer = await stripe.customers.update({
externalId: teamId,
plan: 'plan_id',
quantity: numTeammates,
})
} else {
customer = await stripe.customers.create({
// If team does not exist, create customer
externalId: teamId,
plan: 'plan_id',
quantity: numTeammates,
})
}

// Update the team with the customer id to indicate that they are a customer
// and support querying this customer in Stripe from our application code.
await prisma.team.update({
data: {
customerId: customer.id,
},
where: {
id: teamId,
},
})

您现在可以使用相同的输入多次重试相同的逻辑,而不会产生不良影响。为了进一步增强此示例,您可以引入一种机制,如果更新在设定次数的尝试后仍未成功,则取消或暂时停用订阅。

乐观并发控制

乐观并发控制(OCC)是一种处理单个实体上并发操作的模型,它不依赖于 🔒 锁定。相反,我们乐观地假设记录在读取和写入之间保持不变,并使用并发令牌(时间戳或版本字段)来检测记录的更改。

如果发生 ❌ 冲突(自您读取记录后,其他人已更改该记录),则取消事务。根据您的场景,您可以

  • 重试事务(预订另一个电影座位)
  • 抛出错误(提醒用户他们即将覆盖其他人所做的更改)

本节介绍如何构建您自己的乐观并发控制。另请参阅:GitHub 上应用级乐观并发控制的计划

信息
  • 如果您使用版本 4.4.0 或更早版本,则无法在 update 操作上使用乐观并发控制,因为您无法过滤非唯一字段。您需要与乐观并发控制一起使用的 version 字段是非唯一字段。

  • 从版本 5.0.0 开始,您可以在 update 操作中过滤非唯一字段,以便使用乐观并发控制。该功能在版本 4.5.0 到 4.16.2 期间也通过预览标志 extendedWhereUnique 提供。

何时使用乐观并发控制

  • ✔ 您预计会有大量并发请求(多人预订电影座位)
  • ✔ 您预计这些并发请求之间的冲突很少发生

在高并发应用程序中避免锁定,可以使应用程序更能承受负载并整体上更具可扩展性。尽管锁定本身并不是坏事,但在高并发环境中锁定可能会导致意想不到的后果——即使您只锁定单个行并且只锁定很短的时间。有关更多信息,请参阅

场景:预订电影座位

您正在为电影院创建一个预订系统。每部电影都有一定数量的座位。以下模式对电影和座位进行建模

model Seat {
id Int @id @default(autoincrement())
userId Int?
claimedBy User? @relation(fields: [userId], references: [id])
movieId Int
movie Movie @relation(fields: [movieId], references: [id])
}

model Movie {
id Int @id @default(autoincrement())
name String @unique
seats Seat[]
}

以下示例代码查找第一个可用座位并将该座位分配给用户

const movieName = 'Hidden Figures'

// Find first available seat
const availableSeat = await prisma.seat.findFirst({
where: {
movie: {
name: movieName,
},
claimedBy: null,
},
})

// Throw an error if no seats are available
if (!availableSeat) {
throw new Error(`Oh no! ${movieName} is all booked.`)
}

// Claim the seat
await prisma.seat.update({
data: {
claimedBy: userId,
},
where: {
id: availableSeat.id,
},
})

然而,这段代码存在“重复预订问题”——两个人可能会预订同一个座位

  1. 座位 3A 返回给 Sorcha (findFirst)
  2. 座位 3A 返回给 Ellen (findFirst)
  3. 座位 3A 被 Sorcha 占用 (update)
  4. 座位 3A 被 Ellen 占用 (update - 覆盖 Sorcha 的占用)

尽管 Sorcha 成功预订了座位,但系统最终存储了 Ellen 的预订。为了使用乐观并发控制解决此问题,请在座位中添加一个 version 字段

model Seat {
id Int @id @default(autoincrement())
userId Int?
claimedBy User? @relation(fields: [userId], references: [id])
movieId Int
movie Movie @relation(fields: [movieId], references: [id])
version Int
}

接下来,调整代码以在更新前检查 version 字段

const userEmail = 'alice@prisma.io'
const movieName = 'Hidden Figures'

// Find the first available seat
// availableSeat.version might be 0
const availableSeat = await client.seat.findFirst({
where: {
Movie: {
name: movieName,
},
claimedBy: null,
},
})

if (!availableSeat) {
throw new Error(`Oh no! ${movieName} is all booked.`)
}

// Only mark the seat as claimed if the availableSeat.version
// matches the version we're updating. Additionally, increment the
// version when we perform this update so all other clients trying
// to book this same seat will have an outdated version.
const seats = await client.seat.updateMany({
data: {
claimedBy: userEmail,
version: {
increment: 1,
},
},
where: {
id: availableSeat.id,
version: availableSeat.version, // This version field is the key; only claim seat if in-memory version matches database version, indicating that the field has not been updated
},
})

if (seats.count === 0) {
throw new Error(`That seat is already booked! Please try again.`)
}

现在两个人不可能预订同一个座位了

  1. 座位 3A 返回给 Sorcha (version 为 0)
  2. 座位 3A 返回给 Ellen (version 为 0)
  3. 座位 3A 被 Sorcha 占用 (version 递增到 1,预订成功)
  4. 座位 3A 被 Ellen 占用 (内存中的 version (0) 与数据库中的 version (1) 不匹配 - 预订不成功)

交互式事务

如果您有一个现有应用程序,将您的应用程序重构为使用乐观并发控制可能是一项重大的任务。交互式事务为此类情况提供了一个有用的应急方案。

要创建交互式事务,请将异步函数传递给$transaction

传递给此异步函数的第一个参数是 Prisma Client 实例。下面,我们将此实例称为 tx。在此 tx 实例上调用的任何 Prisma Client 调用都封装在事务中。

在下面的示例中,爱丽丝和鲍勃的账户中各有 100 美元。如果他们尝试发送超过他们拥有的钱,转账将被拒绝。

预期的结果是爱丽丝进行 1 次 100 美元的转账,而另一次转账将被拒绝。这将导致爱丽丝拥有 0 美元,鲍勃拥有 200 美元。

import { PrismaClient } from '../prisma/generated/client'
const prisma = new PrismaClient()

async function transfer(from: string, to: string, amount: number) {
return await prisma.$transaction(async (tx) => {
// 1. Decrement amount from the sender.
const sender = await tx.account.update({
data: {
balance: {
decrement: amount,
},
},
where: {
email: from,
},
})

// 2. Verify that the sender's balance didn't go below zero.
if (sender.balance < 0) {
throw new Error(`${from} doesn't have enough to send ${amount}`)
}

// 3. Increment the recipient's balance by amount
const recipient = tx.account.update({
data: {
balance: {
increment: amount,
},
},
where: {
email: to,
},
})

return recipient
})
}

async function main() {
// This transfer is successful
await transfer('alice@prisma.io', 'bob@prisma.io', 100)
// This transfer fails because Alice doesn't have enough funds in her account
await transfer('alice@prisma.io', 'bob@prisma.io', 100)
}

main()

在上面的示例中,两个 update 查询都在数据库事务中运行。当应用程序到达函数末尾时,事务将提交到数据库。

如果应用程序在此过程中遇到错误,异步函数将抛出异常并自动回滚事务。

您可以在此部分了解有关交互式事务的更多信息。

警告

谨慎使用交互式事务。长时间保持事务打开会损害数据库性能,甚至可能导致死锁。尽量避免在事务函数中执行网络请求和慢查询。我们建议您尽快进出!

结论

Prisma Client 支持多种处理事务的方式,无论是直接通过 API 还是通过支持您在应用程序中引入乐观并发控制和幂等性。如果您觉得您的应用程序中有任何未涵盖的用例,请打开一个GitHub issue 来开始讨论。

© . This site is unofficial and not affiliated with Prisma Data, Inc.