Set statuses on workflows (#6792)

Add listener to keep status on workflows up to date:
- version draft => statuses should contain draft
- version active => statuses should contain active
- version deactivated => if no version active, statuses should contain
deactivated

Renaming also the endpoints because it was not reflecting the full
behaviour.

Finally, adding a new status Archived for versions. Will be used when a
version is deactivated, but is not the last published version anymore.
It means this version cannot be re-activated.
This commit is contained in:
Thomas Trompette
2024-08-30 18:06:04 +02:00
committed by GitHub
parent f7c99ddc7a
commit a3ea0acd1a
15 changed files with 1010 additions and 92 deletions

View File

@ -5,15 +5,21 @@ import { EntityManager } from 'typeorm';
import { buildCreatedByFromWorkspaceMember } from 'src/engine/core-modules/actor/utils/build-created-by-from-workspace-member.util';
import { User } from 'src/engine/core-modules/user/user.entity';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
WorkflowVersionStatus,
WorkflowVersionWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import { WorkflowTriggerType } from 'src/modules/workflow/common/types/workflow-trigger.type';
import {
WorkflowTrigger,
WorkflowTriggerType,
} from 'src/modules/workflow/common/types/workflow-trigger.type';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service';
import { assertVersionCanBeActivated } from 'src/modules/workflow/workflow-trigger/utils/assert-version-can-be-activated.util';
import {
@ -29,6 +35,7 @@ export class WorkflowTriggerWorkspaceService {
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
private readonly databaseEventTriggerService: DatabaseEventTriggerService,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
) {}
async runWorkflowVersion(
@ -58,10 +65,19 @@ export class WorkflowTriggerWorkspaceService {
);
}
async enableWorkflowTrigger(workflowVersionId: string) {
async activateWorkflowVersion(workflowVersionId: string) {
const workflowVersionRepository =
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
'workflowVersion',
);
const workflowVersionNullable = await workflowVersionRepository.findOne({
where: { id: workflowVersionId },
});
const workflowVersion =
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
workflowVersionId,
await this.workflowCommonWorkspaceService.getValidWorkflowVersionOrFail(
workflowVersionNullable,
);
const workflowRepository =
@ -91,38 +107,13 @@ export class WorkflowTriggerWorkspaceService {
const manager = queryRunner.manager;
try {
if (
workflow.lastPublishedVersionId &&
workflowVersionId !== workflow.lastPublishedVersionId
) {
await this.disableWorkflowTriggerWithManager(
workflow.lastPublishedVersionId,
manager,
);
}
await this.activateWorkflowVersion(
workflowVersion.workflowId,
workflowVersionId,
await this.performActivationSteps(
workflow,
workflowVersion,
workflowRepository,
workflowVersionRepository,
manager,
);
await workflowRepository.update(
{ id: workflow.id },
{ lastPublishedVersionId: workflowVersionId },
manager,
);
switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.databaseEventTriggerService.createEventListener(
workflowVersion.workflowId,
workflowVersion.trigger,
manager,
);
break;
default:
break;
}
await queryRunner.commitTransaction();
@ -135,7 +126,7 @@ export class WorkflowTriggerWorkspaceService {
}
}
async disableWorkflowTrigger(workflowVersionId: string) {
async deactivateWorkflowVersion(workflowVersionId: string) {
const workspaceDataSource = await this.twentyORMManager.getDatasource();
const queryRunner = workspaceDataSource.createQueryRunner();
@ -143,8 +134,14 @@ export class WorkflowTriggerWorkspaceService {
await queryRunner.startTransaction();
try {
await this.disableWorkflowTriggerWithManager(
const workflowVersionRepository =
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
'workflowVersion',
);
await this.performDeactivationSteps(
workflowVersionId,
workflowVersionRepository,
queryRunner.manager,
);
@ -159,64 +156,79 @@ export class WorkflowTriggerWorkspaceService {
}
}
private async disableWorkflowTriggerWithManager(
workflowVersionId: string,
private async performActivationSteps(
workflow: WorkflowWorkspaceEntity,
workflowVersion: Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
trigger: WorkflowTrigger;
},
workflowRepository: WorkspaceRepository<WorkflowWorkspaceEntity>,
workflowVersionRepository: WorkspaceRepository<WorkflowVersionWorkspaceEntity>,
manager: EntityManager,
) {
const workflowVersionRepository =
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
'workflowVersion',
);
const workflowVersion = await workflowVersionRepository.findOne({
where: { id: workflowVersionId },
});
if (!workflowVersion) {
throw new WorkflowTriggerException(
'No workflow version found',
WorkflowTriggerExceptionCode.INVALID_INPUT,
if (
workflow.lastPublishedVersionId &&
workflowVersion.id !== workflow.lastPublishedVersionId
) {
await this.performDeactivationSteps(
workflow.lastPublishedVersionId,
workflowVersionRepository,
manager,
);
}
if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) {
throw new WorkflowTriggerException(
'Cannot disable non-active workflow version',
WorkflowTriggerExceptionCode.INVALID_INPUT,
);
}
await workflowVersionRepository.update(
{ id: workflowVersionId },
{ status: WorkflowVersionStatus.DEACTIVATED },
await this.upgradeWorkflowVersion(
workflow,
workflowVersion.id,
workflowRepository,
workflowVersionRepository,
manager,
);
switch (workflowVersion?.trigger?.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.databaseEventTriggerService.deleteEventListener(
workflowVersion.workflowId,
manager,
);
break;
default:
break;
}
await this.setActiveVersionStatus(
workflowVersion,
workflowVersionRepository,
manager,
);
await this.enableTrigger(workflowVersion, manager);
}
private async activateWorkflowVersion(
workflowId: string,
private async performDeactivationSteps(
workflowVersionId: string,
workflowVersionRepository: WorkspaceRepository<WorkflowVersionWorkspaceEntity>,
manager: EntityManager,
) {
const workflowVersionRepository =
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
'workflowVersion',
const workflowVersionNullable = await workflowVersionRepository.findOne({
where: { id: workflowVersionId },
});
const workflowVersion =
await this.workflowCommonWorkspaceService.getValidWorkflowVersionOrFail(
workflowVersionNullable,
);
await this.setDeactivatedVersionStatus(
workflowVersion,
workflowVersionRepository,
manager,
);
await this.disableTrigger(workflowVersion, manager);
}
private async setActiveVersionStatus(
workflowVersion: Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
trigger: WorkflowTrigger;
},
workflowVersionRepository: WorkspaceRepository<WorkflowVersionWorkspaceEntity>,
manager: EntityManager,
) {
const activeWorkflowVersions = await workflowVersionRepository.find(
{
where: { workflowId, status: WorkflowVersionStatus.ACTIVE },
where: {
workflowId: workflowVersion.workflowId,
status: WorkflowVersionStatus.ACTIVE,
},
},
manager,
);
@ -229,9 +241,132 @@ export class WorkflowTriggerWorkspaceService {
}
await workflowVersionRepository.update(
{ id: workflowVersionId },
{ id: workflowVersion.id },
{ status: WorkflowVersionStatus.ACTIVE },
manager,
);
this.emitStatusUpdateEventOrThrow(
workflowVersion.workflowId,
workflowVersion.status,
WorkflowVersionStatus.ACTIVE,
);
}
private async setDeactivatedVersionStatus(
workflowVersion: Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
trigger: WorkflowTrigger;
},
workflowVersionRepository: WorkspaceRepository<WorkflowVersionWorkspaceEntity>,
manager: EntityManager,
) {
if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) {
throw new WorkflowTriggerException(
'Cannot disable non-active workflow version',
WorkflowTriggerExceptionCode.FORBIDDEN,
);
}
await workflowVersionRepository.update(
{ id: workflowVersion.id },
{ status: WorkflowVersionStatus.DEACTIVATED },
manager,
);
this.emitStatusUpdateEventOrThrow(
workflowVersion.workflowId,
workflowVersion.status,
WorkflowVersionStatus.DEACTIVATED,
);
}
private async upgradeWorkflowVersion(
workflow: WorkflowWorkspaceEntity,
newPublishedVersionId: string,
workflowRepository: WorkspaceRepository<WorkflowWorkspaceEntity>,
workflowVersionRepository: WorkspaceRepository<WorkflowVersionWorkspaceEntity>,
manager: EntityManager,
) {
if (workflow.lastPublishedVersionId === newPublishedVersionId) {
return;
}
if (workflow.lastPublishedVersionId) {
await workflowVersionRepository.update(
{ id: workflow.lastPublishedVersionId },
{ status: WorkflowVersionStatus.ARCHIVED },
manager,
);
}
await workflowRepository.update(
{ id: workflow.id },
{ lastPublishedVersionId: newPublishedVersionId },
manager,
);
}
private async enableTrigger(
workflowVersion: Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
trigger: WorkflowTrigger;
},
manager: EntityManager,
) {
switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.databaseEventTriggerService.createEventListener(
workflowVersion.workflowId,
workflowVersion.trigger,
manager,
);
break;
default:
break;
}
}
private async disableTrigger(
workflowVersion: Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
trigger: WorkflowTrigger;
},
manager: EntityManager,
) {
switch (workflowVersion.trigger.type) {
case WorkflowTriggerType.DATABASE_EVENT:
await this.databaseEventTriggerService.deleteEventListener(
workflowVersion.workflowId,
manager,
);
break;
default:
break;
}
}
private emitStatusUpdateEventOrThrow(
workflowId: string,
previousStatus: WorkflowVersionStatus,
newStatus: WorkflowVersionStatus,
) {
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
if (!workspaceId) {
throw new WorkflowTriggerException(
'No workspace id found',
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
);
}
this.workspaceEventEmitter.emit(
'workflowVersion.statusUpdated',
[
{
workflowId,
previousStatus,
newStatus,
} satisfies WorkflowVersionStatusUpdate,
],
workspaceId,
);
}
}