feat(api-sync): implement sync pull endpoint with cursor

This commit is contained in:
2026-04-06 01:03:12 +08:00
parent ecf0d9ff03
commit 661788ae75
4 changed files with 416 additions and 41 deletions
+16
View File
@@ -0,0 +1,16 @@
import { Type } from "class-transformer";
import { IsInt, IsOptional, IsString, Max, MaxLength, Min } from "class-validator";
export class SyncPullQueryDto {
@IsOptional()
@IsString()
@MaxLength(512)
cursor?: string;
@Type(() => Number)
@IsOptional()
@IsInt()
@Min(1)
@Max(200)
limit?: number;
}
+11 -2
View File
@@ -1,11 +1,20 @@
import { Body, Controller, Headers, Post, UnauthorizedException } from "@nestjs/common";
import { Body, Controller, Get, Headers, Post, Query, UnauthorizedException } from "@nestjs/common";
import { SyncPullQueryDto } from "./dto/sync-pull.dto";
import { SyncPushDto } from "./dto/sync-push.dto";
import { SyncPushResponse, SyncService } from "./sync.service";
import { SyncPullResponse, SyncPushResponse, SyncService } from "./sync.service";
@Controller("sync")
export class SyncController {
constructor(private readonly syncService: SyncService) {}
@Get("pull")
async pullOperations(
@Headers("x-user-id") userIdHeader: string | string[] | undefined,
@Query() query: SyncPullQueryDto
): Promise<SyncPullResponse> {
return this.syncService.pullOperations(this.resolveUserId(userIdHeader), query);
}
@Post("push")
async pushOperations(
@Headers("x-user-id") userIdHeader: string | string[] | undefined,
+165 -1
View File
@@ -1,6 +1,7 @@
import { Injectable } from "@nestjs/common";
import { BadRequestException, Injectable } from "@nestjs/common";
import { Prisma } from "../../generated/prisma/client";
import { PrismaService } from "../prisma/prisma.service";
import { SyncPullQueryDto } from "./dto/sync-pull.dto";
import { SyncPushDto, SyncPushOperationDto } from "./dto/sync-push.dto";
export type SyncPushItemStatus = "accepted" | "duplicate" | "failed";
@@ -24,10 +25,79 @@ type ExistingOperationRecord = {
serverTs: Date;
};
type SyncPullCursorState = {
serverTs: string;
opId: string;
};
type SyncPullOperationRecord = {
opId: string;
entityId: string;
entityType: string;
action: string;
payload: Prisma.JsonValue | null;
clientTs: Date;
deviceId: string;
serverTs: Date;
};
export type SyncPullItem = {
opId: string;
entityId: string;
entityType: string;
action: string;
payload: string | null;
clientTs: number;
deviceId: string;
serverTs: string;
};
export type SyncPullResponse = {
items: SyncPullItem[];
nextCursor: string | null;
hasMore: boolean;
};
@Injectable()
export class SyncService {
constructor(private readonly prismaService: PrismaService) {}
async pullOperations(userId: string, query: SyncPullQueryDto): Promise<SyncPullResponse> {
const limit = query.limit ?? 100;
const cursor = this.parseCursor(query.cursor);
const operations = (await this.prismaService.syncOperation.findMany({
where: this.buildPullWhereInput(userId, cursor),
orderBy: [{ serverTs: "asc" }, { opId: "asc" }],
take: limit + 1,
select: {
opId: true,
entityId: true,
entityType: true,
action: true,
payload: true,
clientTs: true,
deviceId: true,
serverTs: true
}
})) as SyncPullOperationRecord[];
const hasMore = operations.length > limit;
const pageItems = hasMore ? operations.slice(0, limit) : operations;
const lastOperation = pageItems.at(-1);
return {
items: pageItems.map((operation) => this.serializePullItem(operation)),
nextCursor: lastOperation
? this.encodeCursor({
serverTs: lastOperation.serverTs.toISOString(),
opId: lastOperation.opId
})
: (query.cursor ?? null),
hasMore
};
}
async pushOperations(userId: string, body: SyncPushDto): Promise<SyncPushResponse> {
const existingOperations = await this.loadExistingOperations(userId, body.operations);
const results: SyncPushItemResult[] = [];
@@ -139,6 +209,100 @@ export class SyncService {
);
}
private buildPullWhereInput(
userId: string,
cursor: SyncPullCursorState | null
): Prisma.SyncOperationWhereInput {
if (!cursor) {
return { userId };
}
const cursorDate = new Date(cursor.serverTs);
return {
userId,
// 同一毫秒内可能有多条操作,必须使用 opId 作为二级游标来保证稳定分页。
OR: [
{
serverTs: {
gt: cursorDate
}
},
{
serverTs: cursorDate,
opId: {
gt: cursor.opId
}
}
]
};
}
private serializePullItem(operation: SyncPullOperationRecord): SyncPullItem {
return {
opId: operation.opId,
entityId: operation.entityId,
entityType: operation.entityType,
action: operation.action,
payload: this.serializePayload(operation.payload),
clientTs: operation.clientTs.getTime(),
deviceId: operation.deviceId,
serverTs: operation.serverTs.toISOString()
};
}
private serializePayload(payload: Prisma.JsonValue | null): string | null {
if (payload === null) {
return null;
}
if (typeof payload === "string") {
return payload;
}
return JSON.stringify(payload);
}
private parseCursor(cursor: string | undefined): SyncPullCursorState | null {
if (!cursor) {
return null;
}
let decodedCursor: unknown;
try {
decodedCursor = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8"));
} catch {
throw new BadRequestException("Invalid sync cursor");
}
if (typeof decodedCursor !== "object" || decodedCursor === null) {
throw new BadRequestException("Invalid sync cursor");
}
const cursorRecord = decodedCursor as {
serverTs?: unknown;
opId?: unknown;
};
if (
typeof cursorRecord.serverTs !== "string" ||
typeof cursorRecord.opId !== "string" ||
Number.isNaN(Date.parse(cursorRecord.serverTs)) ||
cursorRecord.opId.trim().length === 0
) {
throw new BadRequestException("Invalid sync cursor");
}
return {
serverTs: cursorRecord.serverTs,
opId: cursorRecord.opId
};
}
private encodeCursor(cursor: SyncPullCursorState): string {
return Buffer.from(JSON.stringify(cursor), "utf8").toString("base64url");
}
private isDuplicateOpIdError(error: unknown): boolean {
if (!(error instanceof Prisma.PrismaClientKnownRequestError)) {
return false;
+224 -38
View File
@@ -13,54 +13,126 @@ type SyncOperationRecord = {
entityType: string;
entityId: string;
action: string;
payload?: string;
payload: string | null;
clientTs: Date;
serverTs: Date;
};
type SyncOperationSelect = {
opId?: true;
entityId?: true;
entityType?: true;
action?: true;
payload?: true;
clientTs?: true;
deviceId?: true;
serverTs?: true;
};
type SyncOperationFindManyArgs = {
where: {
userId: string;
opId?: {
in: string[];
};
OR?: Array<
| {
serverTs: {
gt: Date;
};
}
| {
serverTs: Date;
opId: {
gt: string;
};
}
>;
};
select: SyncOperationSelect;
orderBy?: Array<{
serverTs?: "asc" | "desc";
opId?: "asc" | "desc";
}>;
take?: number;
};
type SyncOperationCreateArgs = {
data: {
opId: string;
userId: string;
deviceId: string;
entityType: string;
entityId: string;
action: string;
payload?: string;
clientTs: Date;
};
select: {
opId: true;
serverTs: true;
};
};
class InMemoryPrismaService {
private syncOperationIdSequence = 1;
private syncOperations: SyncOperationRecord[] = [];
readonly syncOperation = {
findMany: async (args: {
where: {
userId: string;
opId: {
in: string[];
};
};
select: {
opId: true;
serverTs: true;
};
}) => {
return this.syncOperations
.filter(
(item) => item.userId === args.where.userId && args.where.opId.in.includes(item.opId)
)
.map((item) => ({
opId: item.opId,
serverTs: item.serverTs
}));
findMany: async (args: SyncOperationFindManyArgs) => {
let items = this.syncOperations.filter((item) => item.userId === args.where.userId);
if (args.where.opId?.in) {
items = items.filter((item) => args.where.opId?.in.includes(item.opId));
}
if (args.where.OR && args.where.OR.length > 0) {
items = items.filter((item) =>
args.where.OR?.some((condition) => {
if ("gt" in condition.serverTs) {
return item.serverTs.getTime() > condition.serverTs.gt.getTime();
}
if ("opId" in condition) {
return (
item.serverTs.getTime() === condition.serverTs.getTime() &&
item.opId > condition.opId.gt
);
}
return false;
})
);
}
if (args.orderBy && args.orderBy.length > 0) {
items = [...items].sort((left, right) => {
for (const orderRule of args.orderBy ?? []) {
if (orderRule.serverTs) {
const diff = left.serverTs.getTime() - right.serverTs.getTime();
if (diff !== 0) {
return orderRule.serverTs === "asc" ? diff : -diff;
}
}
if (orderRule.opId) {
const diff = left.opId.localeCompare(right.opId);
if (diff !== 0) {
return orderRule.opId === "asc" ? diff : -diff;
}
}
}
return 0;
});
}
const limitedItems = args.take ? items.slice(0, args.take) : items;
return limitedItems.map((item) => this.pickSelectedFields(item, args.select));
},
create: async (args: {
data: {
opId: string;
userId: string;
deviceId: string;
entityType: string;
entityId: string;
action: string;
payload?: string;
clientTs: Date;
};
select: {
opId: true;
serverTs: true;
};
}) => {
create: async (args: SyncOperationCreateArgs) => {
const createdOperation: SyncOperationRecord = {
id: `sync_${this.syncOperationIdSequence++}`,
opId: args.data.opId,
@@ -69,7 +141,7 @@ class InMemoryPrismaService {
entityType: args.data.entityType,
entityId: args.data.entityId,
action: args.data.action,
payload: args.data.payload,
payload: args.data.payload ?? null,
clientTs: args.data.clientTs,
serverTs: new Date()
};
@@ -86,6 +158,33 @@ class InMemoryPrismaService {
getOperationCount(): number {
return this.syncOperations.length;
}
seedOperations(records: Array<Omit<SyncOperationRecord, "id">>): void {
for (const record of records) {
this.syncOperations.push({
...record,
id: `sync_${this.syncOperationIdSequence++}`
});
}
}
private pickSelectedFields(
item: SyncOperationRecord,
select: SyncOperationSelect
): Partial<SyncOperationRecord> {
const result: Record<string, unknown> = {};
for (const key of Object.keys(select) as Array<keyof SyncOperationSelect>) {
if (!select[key]) {
continue;
}
const recordKey = key as keyof SyncOperationRecord;
result[recordKey] = item[recordKey];
}
return result as Partial<SyncOperationRecord>;
}
}
describe("SyncController (integration)", () => {
@@ -230,4 +329,91 @@ describe("SyncController (integration)", () => {
);
expect(prismaService.getOperationCount()).toBe(3);
});
it("should pull operations incrementally with a stable cursor", async () => {
prismaService.seedOperations([
{
opId: "pull-op-1",
userId: "user-pull",
deviceId: "device-c",
entityType: "TASK",
entityId: "task-10",
action: "CREATE",
payload: '{"title":"任务甲"}',
clientTs: new Date("2026-04-06T10:00:00.000Z"),
serverTs: new Date("2026-04-06T10:10:00.000Z")
},
{
opId: "pull-op-2",
userId: "user-pull",
deviceId: "device-c",
entityType: "TASK",
entityId: "task-10",
action: "UPDATE",
payload: '{"title":"任务甲-更新"}',
clientTs: new Date("2026-04-06T10:01:00.000Z"),
serverTs: new Date("2026-04-06T10:10:00.000Z")
},
{
opId: "pull-op-3",
userId: "user-pull",
deviceId: "device-c",
entityType: "TASK",
entityId: "task-11",
action: "CREATE",
payload: '{"title":"任务乙"}',
clientTs: new Date("2026-04-06T10:02:00.000Z"),
serverTs: new Date("2026-04-06T10:11:00.000Z")
},
{
opId: "pull-op-other-user",
userId: "user-other",
deviceId: "device-z",
entityType: "TASK",
entityId: "task-99",
action: "CREATE",
payload: '{"title":"其他用户任务"}',
clientTs: new Date("2026-04-06T10:03:00.000Z"),
serverTs: new Date("2026-04-06T10:12:00.000Z")
}
]);
const firstResponse = await request(app.getHttpServer())
.get("/sync/pull")
.set("x-user-id", "user-pull")
.query({ limit: 2 })
.expect(200);
expect(firstResponse.body.items.map((item: { opId: string }) => item.opId)).toEqual([
"pull-op-1",
"pull-op-2"
]);
expect(firstResponse.body.hasMore).toBe(true);
expect(firstResponse.body.nextCursor).toEqual(expect.any(String));
const secondResponse = await request(app.getHttpServer())
.get("/sync/pull")
.set("x-user-id", "user-pull")
.query({
limit: 2,
cursor: firstResponse.body.nextCursor
})
.expect(200);
expect(secondResponse.body.items.map((item: { opId: string }) => item.opId)).toEqual([
"pull-op-3"
]);
expect(secondResponse.body.hasMore).toBe(false);
expect(secondResponse.body.nextCursor).toEqual(expect.any(String));
});
it("should reject invalid cursor payload", async () => {
await request(app.getHttpServer())
.get("/sync/pull")
.set("x-user-id", "user-invalid-cursor")
.query({
cursor: "not-a-valid-cursor"
})
.expect(400);
});
});