Make workflow statuses more resilient (#12263)

Workflow statuses are often broken. I did not figured out why yet. But I
see two causes that can be fixed:
- statuses calculation are really complicated today, just to spare a
call to the database
- job is not indempotent, it is using the combination of the previous
statuses + the update to calculate the new statuses. Which means that
once broken, next updates will be broken as well

Instead, we now:
- fetch workflow versions
- get the statuses from these.
It simplifies the code and make the job indempotent.
This commit is contained in:
Thomas Trompette
2025-05-26 15:05:18 +02:00
committed by GitHub
parent 742af7884a
commit ec4e4740d2
7 changed files with 168 additions and 367 deletions

View File

@ -1,19 +0,0 @@
import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
export const ACTIVE_AND_DRAFT_STATUSES = [
WorkflowStatus.ACTIVE,
WorkflowStatus.DRAFT,
];
export const DEACTIVATED_AND_DRAFT_STATUSES = [
WorkflowStatus.DEACTIVATED,
WorkflowStatus.DRAFT,
];
export const ACTIVE_STATUSES = [WorkflowStatus.ACTIVE];
export const DEACTIVATED_STATUSES = [WorkflowStatus.DEACTIVATED];
export const DRAFT_STATUSES = [WorkflowStatus.DRAFT];
export const NO_STATUSES = [];

View File

@ -1,8 +0,0 @@
export enum WorkflowStatusCombination {
ACTIVE = 'ACTIVE',
DRAFT = 'DRAFT',
DEACTIVATED = 'DEACTIVATED',
ACTIVE_AND_DRAFT = 'ACTIVE_AND_DRAFT',
DEACTIVATED_AND_DRAFT = 'DEACTIVATED_AND_DRAFT',
NO_STATUSES = 'NO_STATUSES',
}

View File

@ -21,12 +21,28 @@ describe('WorkflowStatusesUpdate', () => {
update: jest.fn(),
};
const mockWorkflowVersionRepository = {
findOneOrFail: jest.fn(),
find: jest.fn(),
update: jest.fn(),
};
const mockTwentyORMManager = {
getRepository: jest.fn().mockResolvedValue(mockWorkflowRepository),
getRepository: jest.fn().mockImplementation((entity) => {
if (entity === 'workflow') {
return Promise.resolve(mockWorkflowRepository);
}
if (entity === 'workflowVersion') {
return Promise.resolve(mockWorkflowVersionRepository);
}
return Promise.resolve(null);
}),
};
const mockServerlessFunctionService = {
publishOneServerlessFunction: jest.fn(),
findOneOrFail: jest.fn(),
};
const mockWorkspaceEventEmitter = {
@ -79,10 +95,14 @@ describe('WorkflowStatusesUpdate', () => {
};
const mockWorkflow = {
id: '1',
statuses: [WorkflowStatus.DRAFT],
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
mockWorkflowVersionRepository.find.mockResolvedValue([
{ status: WorkflowVersionStatus.DRAFT },
]);
await job.handle(event);
@ -106,13 +126,17 @@ describe('WorkflowStatusesUpdate', () => {
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
mockWorkflowVersionRepository.find.mockResolvedValue([
{ status: WorkflowVersionStatus.ACTIVE },
{ status: WorkflowVersionStatus.DRAFT },
]);
await job.handle(event);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
expect(mockWorkflowRepository.update).toHaveBeenCalledWith(
{ id: '1' },
{ statuses: [WorkflowStatus.ACTIVE, WorkflowStatus.DRAFT] },
{ statuses: [WorkflowStatus.DRAFT, WorkflowStatus.ACTIVE] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
@ -136,114 +160,37 @@ describe('WorkflowStatusesUpdate', () => {
};
const mockWorkflow = {
id: '1',
statuses: [WorkflowStatus.ACTIVE],
};
const mockWorkflowVersion = {
id: '1',
status: WorkflowVersionStatus.ACTIVE,
steps: [],
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
mockWorkflowVersionRepository.findOneOrFail.mockResolvedValue(
mockWorkflowVersion,
);
mockWorkflowVersionRepository.find.mockResolvedValue([
{ status: WorkflowVersionStatus.ACTIVE },
]);
await job.handle(event);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
expect(
mockWorkflowVersionRepository.findOneOrFail,
).toHaveBeenCalledTimes(1);
expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(0);
});
test('when update that should be impossible, do not do anything', async () => {
const event: WorkflowVersionBatchEvent = {
workspaceId: '1',
type: WorkflowVersionEventType.STATUS_UPDATE,
statusUpdates: [
{
workflowId: '1',
workflowVersionId: '1',
previousStatus: WorkflowVersionStatus.ACTIVE,
newStatus: WorkflowVersionStatus.DRAFT,
},
],
};
const mockWorkflow = {
statuses: [WorkflowStatus.ACTIVE],
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
await job.handle(event);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(0);
});
test('when WorkflowVersionStatus.DEACTIVATED to WorkflowVersionStatus.ACTIVE, should activate', async () => {
const event: WorkflowVersionBatchEvent = {
workspaceId: '1',
type: WorkflowVersionEventType.STATUS_UPDATE,
statusUpdates: [
{
workflowId: '1',
workflowVersionId: '1',
previousStatus: WorkflowVersionStatus.DEACTIVATED,
newStatus: WorkflowVersionStatus.ACTIVE,
},
],
};
const mockWorkflow = {
statuses: [WorkflowStatus.DEACTIVATED],
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
await job.handle(event);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
expect(mockWorkflowRepository.update).toHaveBeenCalledWith(
{ id: '1' },
{ statuses: [WorkflowStatus.ACTIVE] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(1);
});
test('when WorkflowVersionStatus.ACTIVE to WorkflowVersionStatus.DEACTIVATED, should deactivate', async () => {
const event: WorkflowVersionBatchEvent = {
workspaceId: '1',
type: WorkflowVersionEventType.STATUS_UPDATE,
statusUpdates: [
{
workflowId: '1',
workflowVersionId: '1',
previousStatus: WorkflowVersionStatus.ACTIVE,
newStatus: WorkflowVersionStatus.DEACTIVATED,
},
],
};
const mockWorkflow = {
statuses: [WorkflowStatus.ACTIVE],
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
await job.handle(event);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
expect(mockWorkflowRepository.update).toHaveBeenCalledWith(
{ id: '1' },
{ statuses: [WorkflowStatus.DEACTIVATED] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(1);
});
test('when WorkflowVersionStatus.DRAFT to WorkflowVersionStatus.ACTIVE, should activate', async () => {
test('when WorkflowVersionStatus.DRAFT to WorkflowVersionStatus.ACTIVE, should activate and publish serverless functions', async () => {
const event: WorkflowVersionBatchEvent = {
workspaceId: '1',
type: WorkflowVersionEventType.STATUS_UPDATE,
@ -258,14 +205,63 @@ describe('WorkflowStatusesUpdate', () => {
};
const mockWorkflow = {
id: '1',
statuses: [WorkflowStatus.DRAFT],
};
const mockWorkflowVersion = {
id: '1',
status: WorkflowVersionStatus.ACTIVE,
steps: [
{
type: 'CODE',
settings: {
input: {
serverlessFunctionId: 'serverless-1',
},
},
},
],
};
const mockServerlessFunction = {
id: 'serverless-1',
latestVersion: 'v2',
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
mockWorkflowVersionRepository.findOneOrFail.mockResolvedValue(
mockWorkflowVersion,
);
mockWorkflowVersionRepository.find.mockResolvedValue([
{ status: WorkflowVersionStatus.ACTIVE },
]);
mockServerlessFunctionService.findOneOrFail.mockResolvedValue(
mockServerlessFunction,
);
await job.handle(event);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
expect(
mockWorkflowVersionRepository.findOneOrFail,
).toHaveBeenCalledTimes(1);
expect(
mockServerlessFunctionService.publishOneServerlessFunction,
).toHaveBeenCalledWith('serverless-1', '1');
expect(mockWorkflowVersionRepository.update).toHaveBeenCalledWith('1', {
steps: [
{
type: 'CODE',
settings: {
input: {
serverlessFunctionId: 'serverless-1',
serverlessFunctionVersion: 'v2',
},
},
},
],
});
expect(mockWorkflowRepository.update).toHaveBeenCalledWith(
{ id: '1' },
{ statuses: [WorkflowStatus.ACTIVE] },
@ -285,10 +281,14 @@ describe('WorkflowStatusesUpdate', () => {
};
const mockWorkflow = {
id: '1',
statuses: [WorkflowStatus.ACTIVE],
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
mockWorkflowVersionRepository.find.mockResolvedValue([
{ status: WorkflowVersionStatus.ACTIVE },
]);
await job.handle(event);
@ -304,10 +304,12 @@ describe('WorkflowStatusesUpdate', () => {
};
const mockWorkflow = {
id: '1',
statuses: [WorkflowStatus.DRAFT],
};
mockWorkflowRepository.findOneOrFail.mockResolvedValue(mockWorkflow);
mockWorkflowVersionRepository.find.mockResolvedValue([]);
await job.handle(event);

View File

@ -1,8 +1,9 @@
import { Logger, Scope } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import isEqual from 'lodash.isequal';
import { isDefined } from 'twenty-shared/utils';
import { Repository } from 'typeorm';
import { In, Repository } from 'typeorm';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
@ -26,9 +27,6 @@ import {
WorkflowAction,
WorkflowActionType,
} from 'src/modules/workflow/workflow-executor/workflow-actions/types/workflow-action.type';
import { getStatusCombinationFromArray } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util';
import { getStatusCombinationFromUpdate } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util';
import { getWorkflowStatusesFromCombination } from 'src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util';
export enum WorkflowVersionEventType {
CREATE = 'CREATE',
@ -90,9 +88,10 @@ export class WorkflowStatusesUpdateJob {
switch (event.type) {
case WorkflowVersionEventType.CREATE:
case WorkflowVersionEventType.DELETE:
await Promise.all(
event.workflowIds.map((workflowId) =>
this.handleWorkflowVersionCreated({
this.handleWorkflowVersionCreatedOrDeleted({
workflowId,
workflowObjectMetadata,
workspaceId: event.workspaceId,
@ -111,23 +110,12 @@ export class WorkflowStatusesUpdateJob {
),
);
break;
case WorkflowVersionEventType.DELETE:
await Promise.all(
event.workflowIds.map((workflowId) =>
this.handleWorkflowVersionDeleted({
workflowId,
workflowObjectMetadata,
workspaceId: event.workspaceId,
}),
),
);
break;
default:
break;
}
}
private async handleWorkflowVersionCreated({
private async handleWorkflowVersionCreatedOrDeleted({
workflowId,
workflowObjectMetadata,
workspaceId,
@ -141,30 +129,26 @@ export class WorkflowStatusesUpdateJob {
'workflow',
);
const workflowVersionRepository =
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
'workflowVersion',
);
const newWorkflowStatuses = await this.getWorkflowStatuses({
workflowId,
workflowVersionRepository,
});
const previousWorkflow = await workflowRepository.findOneOrFail({
where: {
id: workflowId,
},
});
const currentWorkflowStatusCombination = getStatusCombinationFromArray(
previousWorkflow.statuses || [],
);
const newWorkflowStatusCombination = getStatusCombinationFromUpdate(
currentWorkflowStatusCombination,
undefined,
WorkflowVersionStatus.DRAFT,
);
if (newWorkflowStatusCombination === currentWorkflowStatusCombination) {
if (isEqual(newWorkflowStatuses, previousWorkflow.statuses)) {
return;
}
const newWorkflowStatuses = getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
);
await workflowRepository.update(
{
id: workflowId,
@ -285,24 +269,15 @@ export class WorkflowStatusesUpdateJob {
statusUpdate,
});
const currentWorkflowStatusCombination = getStatusCombinationFromArray(
workflow.statuses || [],
);
const newWorkflowStatuses = await this.getWorkflowStatuses({
workflowId: statusUpdate.workflowId,
workflowVersionRepository,
});
const newWorkflowStatusCombination = getStatusCombinationFromUpdate(
currentWorkflowStatusCombination,
statusUpdate.previousStatus,
statusUpdate.newStatus,
);
if (newWorkflowStatusCombination === currentWorkflowStatusCombination) {
if (isEqual(newWorkflowStatuses, workflow.statuses)) {
return;
}
const newWorkflowStatuses = getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
);
await workflowRepository.update(
{
id: statusUpdate.workflowId,
@ -320,61 +295,6 @@ export class WorkflowStatusesUpdateJob {
});
}
private async handleWorkflowVersionDeleted({
workflowId,
workflowObjectMetadata,
workspaceId,
}: {
workflowId: string;
workflowObjectMetadata: ObjectMetadataEntity;
workspaceId: string;
}): Promise<void> {
const workflowRepository =
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
'workflow',
);
const workflow = await workflowRepository.findOneOrFail({
where: {
id: workflowId,
},
});
const currentWorkflowStatusCombination = getStatusCombinationFromArray(
workflow.statuses || [],
);
const newWorkflowStatusCombination = getStatusCombinationFromUpdate(
currentWorkflowStatusCombination,
WorkflowVersionStatus.DRAFT,
undefined,
);
if (newWorkflowStatusCombination === currentWorkflowStatusCombination) {
return;
}
const newWorkflowStatuses = getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
);
await workflowRepository.update(
{
id: workflowId,
},
{
statuses: newWorkflowStatuses,
},
);
this.emitWorkflowStatusUpdatedEvent({
currentWorkflow: workflow,
workflowObjectMetadata,
newWorkflowStatuses,
workspaceId,
});
}
private emitWorkflowStatusUpdatedEvent({
currentWorkflow,
workflowObjectMetadata,
@ -412,4 +332,51 @@ export class WorkflowStatusesUpdateJob {
workspaceId,
});
}
private async getWorkflowStatuses({
workflowId,
workflowVersionRepository,
}: {
workflowId: string;
workflowVersionRepository: WorkspaceRepository<WorkflowVersionWorkspaceEntity>;
}) {
const statuses: WorkflowStatus[] = [];
const workflowVersions = await workflowVersionRepository.find({
where: {
workflowId,
status: In([
WorkflowVersionStatus.ACTIVE,
WorkflowVersionStatus.DRAFT,
WorkflowVersionStatus.DEACTIVATED,
]),
},
});
const hasDraftVersion = workflowVersions.some(
(version) => version.status === WorkflowVersionStatus.DRAFT,
);
if (hasDraftVersion) {
statuses.push(WorkflowStatus.DRAFT);
}
const hasActiveVersion = workflowVersions.some(
(version) => version.status === WorkflowVersionStatus.ACTIVE,
);
if (hasActiveVersion) {
statuses.push(WorkflowStatus.ACTIVE);
}
const hasDeactivatedVersion = workflowVersions.some(
(version) => version.status === WorkflowVersionStatus.DEACTIVATED,
);
if (!hasActiveVersion && hasDeactivatedVersion) {
statuses.push(WorkflowStatus.DEACTIVATED);
}
return statuses;
}
}

View File

@ -1,37 +0,0 @@
import isEqual from 'lodash.isequal';
import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
ACTIVE_AND_DRAFT_STATUSES,
ACTIVE_STATUSES,
DEACTIVATED_AND_DRAFT_STATUSES,
DEACTIVATED_STATUSES,
DRAFT_STATUSES,
} from 'src/modules/workflow/workflow-status/constants/workflow-status.constants';
import { WorkflowStatusCombination } from 'src/modules/workflow/workflow-status/enums/workflow-status.enum';
export const getStatusCombinationFromArray = (
statuses: WorkflowStatus[],
): WorkflowStatusCombination => {
if (isEqual(statuses, ACTIVE_AND_DRAFT_STATUSES)) {
return WorkflowStatusCombination.ACTIVE_AND_DRAFT;
}
if (isEqual(statuses, ACTIVE_STATUSES)) {
return WorkflowStatusCombination.ACTIVE;
}
if (isEqual(statuses, DEACTIVATED_AND_DRAFT_STATUSES)) {
return WorkflowStatusCombination.DEACTIVATED_AND_DRAFT;
}
if (isEqual(statuses, DEACTIVATED_STATUSES)) {
return WorkflowStatusCombination.DEACTIVATED;
}
if (isEqual(statuses, DRAFT_STATUSES)) {
return WorkflowStatusCombination.DRAFT;
}
return WorkflowStatusCombination.NO_STATUSES;
};

View File

@ -1,75 +0,0 @@
import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowStatusCombination } from 'src/modules/workflow/workflow-status/enums/workflow-status.enum';
export const getStatusCombinationFromUpdate = (
previousCombination: WorkflowStatusCombination,
statusToRemove?: WorkflowVersionStatus,
statusToAdd?: WorkflowVersionStatus,
): WorkflowStatusCombination => {
switch (previousCombination) {
case WorkflowStatusCombination.ACTIVE_AND_DRAFT:
if (
statusToAdd === WorkflowVersionStatus.ACTIVE &&
statusToRemove === WorkflowVersionStatus.DRAFT
) {
return WorkflowStatusCombination.ACTIVE;
}
if (statusToRemove === WorkflowVersionStatus.DRAFT) {
return WorkflowStatusCombination.ACTIVE;
}
break;
case WorkflowStatusCombination.DEACTIVATED_AND_DRAFT:
if (
statusToRemove === WorkflowVersionStatus.DRAFT &&
statusToAdd === WorkflowVersionStatus.ACTIVE
) {
return WorkflowStatusCombination.ACTIVE;
}
if (statusToRemove === WorkflowVersionStatus.DRAFT) {
return WorkflowStatusCombination.DEACTIVATED;
}
break;
case WorkflowStatusCombination.ACTIVE:
if (
statusToRemove === WorkflowVersionStatus.ACTIVE &&
statusToAdd === WorkflowVersionStatus.DEACTIVATED
) {
return WorkflowStatusCombination.DEACTIVATED;
}
if (!statusToRemove && statusToAdd === WorkflowVersionStatus.DRAFT) {
return WorkflowStatusCombination.ACTIVE_AND_DRAFT;
}
break;
case WorkflowStatusCombination.DEACTIVATED:
if (
statusToRemove === WorkflowVersionStatus.DEACTIVATED &&
statusToAdd === WorkflowVersionStatus.ACTIVE
) {
return WorkflowStatusCombination.ACTIVE;
}
if (!statusToRemove && statusToAdd === WorkflowVersionStatus.DRAFT) {
return WorkflowStatusCombination.DEACTIVATED_AND_DRAFT;
}
break;
case WorkflowStatusCombination.DRAFT:
if (
statusToRemove === WorkflowVersionStatus.DRAFT &&
statusToAdd === WorkflowVersionStatus.ACTIVE
) {
return WorkflowStatusCombination.ACTIVE;
}
if (statusToRemove === WorkflowVersionStatus.DRAFT) {
return WorkflowStatusCombination.NO_STATUSES;
}
break;
case WorkflowStatusCombination.NO_STATUSES:
if (statusToAdd === WorkflowVersionStatus.DRAFT) {
return WorkflowStatusCombination.DRAFT;
}
break;
default:
break;
}
return previousCombination;
};

View File

@ -1,29 +0,0 @@
import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
ACTIVE_AND_DRAFT_STATUSES,
ACTIVE_STATUSES,
DEACTIVATED_AND_DRAFT_STATUSES,
DEACTIVATED_STATUSES,
DRAFT_STATUSES,
NO_STATUSES,
} from 'src/modules/workflow/workflow-status/constants/workflow-status.constants';
import { WorkflowStatusCombination } from 'src/modules/workflow/workflow-status/enums/workflow-status.enum';
export const getWorkflowStatusesFromCombination = (
combination: WorkflowStatusCombination,
): WorkflowStatus[] => {
switch (combination) {
case WorkflowStatusCombination.ACTIVE:
return ACTIVE_STATUSES;
case WorkflowStatusCombination.DRAFT:
return DRAFT_STATUSES;
case WorkflowStatusCombination.DEACTIVATED:
return DEACTIVATED_STATUSES;
case WorkflowStatusCombination.ACTIVE_AND_DRAFT:
return ACTIVE_AND_DRAFT_STATUSES;
case WorkflowStatusCombination.DEACTIVATED_AND_DRAFT:
return DEACTIVATED_AND_DRAFT_STATUSES;
case WorkflowStatusCombination.NO_STATUSES:
return NO_STATUSES;
}
};