Add workflow statuses (#6765)
Following figma updates https://www.figma.com/design/PNBfTgOVraw557OXChYagk/Explo?node-id=21872-7929&t=DOUzd6rzwr6lprcs-0 - No activity targets for workflow entities for now - Adding a direct relation between workflow run et workflow - Adding a status on the version (draft, active, deactivated) - Adding a list of statuses on workflow - publishedVersionId => lastPublishedVersionId Also adding: - the endpoint to deactivate a version
This commit is contained in:
@ -0,0 +1,12 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
|
||||
import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service';
|
||||
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/database-event-trigger/listeners/database-event-trigger.listener';
|
||||
|
||||
@Module({
|
||||
imports: [FeatureFlagModule],
|
||||
providers: [DatabaseEventTriggerService, DatabaseEventTriggerListener],
|
||||
exports: [DatabaseEventTriggerService],
|
||||
})
|
||||
export class DatabaseEventTriggerModule {}
|
||||
@ -0,0 +1,50 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { EntityManager } from 'typeorm';
|
||||
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
|
||||
import { WorkflowDatabaseEventTrigger } from 'src/modules/workflow/common/types/workflow-trigger.type';
|
||||
|
||||
@Injectable()
|
||||
export class DatabaseEventTriggerService {
|
||||
constructor(private readonly twentyORMManager: TwentyORMManager) {}
|
||||
|
||||
async createEventListener(
|
||||
workflowId: string,
|
||||
trigger: WorkflowDatabaseEventTrigger,
|
||||
manager: EntityManager,
|
||||
) {
|
||||
const eventName = trigger.settings.eventName;
|
||||
|
||||
const workflowEventListenerRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
|
||||
'workflowEventListener',
|
||||
);
|
||||
|
||||
const workflowEventListener = await workflowEventListenerRepository.create({
|
||||
workflowId,
|
||||
eventName,
|
||||
});
|
||||
|
||||
await workflowEventListenerRepository.save(
|
||||
workflowEventListener,
|
||||
{},
|
||||
manager,
|
||||
);
|
||||
}
|
||||
|
||||
async deleteEventListener(workflowId: string, manager: EntityManager) {
|
||||
const workflowEventListenerRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
|
||||
'workflowEventListener',
|
||||
);
|
||||
|
||||
await workflowEventListenerRepository.delete(
|
||||
{
|
||||
workflowId,
|
||||
},
|
||||
manager,
|
||||
);
|
||||
}
|
||||
}
|
||||
@ -5,6 +5,10 @@ import { Processor } from 'src/engine/integrations/message-queue/decorators/proc
|
||||
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
|
||||
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import {
|
||||
WorkflowVersionStatus,
|
||||
WorkflowVersionWorkspaceEntity,
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
|
||||
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
|
||||
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
|
||||
import {
|
||||
@ -36,16 +40,32 @@ export class WorkflowEventTriggerJob {
|
||||
id: data.workflowId,
|
||||
});
|
||||
|
||||
if (!workflow.publishedVersionId) {
|
||||
if (!workflow.lastPublishedVersionId) {
|
||||
throw new WorkflowTriggerException(
|
||||
'Workflow has no published version',
|
||||
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
|
||||
);
|
||||
}
|
||||
|
||||
const workflowVersionRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
|
||||
'workflowVersion',
|
||||
);
|
||||
|
||||
const workflowVersion = await workflowVersionRepository.findOneByOrFail({
|
||||
id: workflow.lastPublishedVersionId,
|
||||
});
|
||||
|
||||
if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) {
|
||||
throw new WorkflowTriggerException(
|
||||
'Workflow version is not active',
|
||||
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
|
||||
);
|
||||
}
|
||||
|
||||
await this.workflowRunnerWorkspaceService.run(
|
||||
data.workspaceId,
|
||||
workflow.publishedVersionId,
|
||||
workflow.lastPublishedVersionId,
|
||||
data.payload,
|
||||
{
|
||||
source: FieldActorSource.WORKFLOW,
|
||||
|
||||
@ -1,4 +1,8 @@
|
||||
import { WorkflowVersionWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
|
||||
import {
|
||||
WorkflowVersionStatus,
|
||||
WorkflowVersionWorkspaceEntity,
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
|
||||
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
|
||||
import {
|
||||
WorkflowTrigger,
|
||||
WorkflowTriggerType,
|
||||
@ -8,7 +12,32 @@ import {
|
||||
WorkflowTriggerExceptionCode,
|
||||
} from 'src/modules/workflow/workflow-trigger/workflow-trigger.exception';
|
||||
|
||||
export function assertWorkflowVersionIsValid(
|
||||
export function assertVersionCanBeActivated(
|
||||
workflowVersion: Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
|
||||
trigger: WorkflowTrigger;
|
||||
},
|
||||
workflow: WorkflowWorkspaceEntity,
|
||||
) {
|
||||
assertVersionIsValid(workflowVersion);
|
||||
|
||||
const isLastPublishedVersion =
|
||||
workflow.lastPublishedVersionId === workflowVersion.id;
|
||||
|
||||
const isDraft = workflowVersion.status === WorkflowVersionStatus.DRAFT;
|
||||
|
||||
const isLastPublishedVersionDeactivated =
|
||||
workflowVersion.status === WorkflowVersionStatus.DEACTIVATED &&
|
||||
isLastPublishedVersion;
|
||||
|
||||
if (!isDraft && !isLastPublishedVersionDeactivated) {
|
||||
throw new WorkflowTriggerException(
|
||||
'Cannot activate non-draft or non-last-published version',
|
||||
WorkflowTriggerExceptionCode.INVALID_INPUT,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function assertVersionIsValid(
|
||||
workflowVersion: Omit<WorkflowVersionWorkspaceEntity, 'trigger'> & {
|
||||
trigger: WorkflowTrigger;
|
||||
},
|
||||
@ -12,5 +12,6 @@ export enum WorkflowTriggerExceptionCode {
|
||||
INVALID_WORKFLOW_TRIGGER = 'INVALID_WORKFLOW_TRIGGER',
|
||||
INVALID_WORKFLOW_VERSION = 'INVALID_WORKFLOW_VERSION',
|
||||
INVALID_ACTION_TYPE = 'INVALID_ACTION_TYPE',
|
||||
FORBIDDEN = 'FORBIDDEN',
|
||||
INTERNAL_ERROR = 'INTERNAL_ERROR',
|
||||
}
|
||||
|
||||
@ -1,19 +1,21 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
|
||||
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
|
||||
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
|
||||
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
|
||||
import { DatabaseEventTriggerModule } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module';
|
||||
import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
|
||||
import { DatabaseEventTriggerListener } from 'src/modules/workflow/workflow-trigger/listeners/database-event-trigger.listener';
|
||||
import { WorkflowTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/workflow-trigger.workspace-service';
|
||||
|
||||
@Module({
|
||||
imports: [WorkflowCommonModule, WorkflowRunnerModule, FeatureFlagModule],
|
||||
imports: [
|
||||
WorkflowCommonModule,
|
||||
WorkflowRunnerModule,
|
||||
DatabaseEventTriggerModule,
|
||||
],
|
||||
providers: [
|
||||
WorkflowTriggerWorkspaceService,
|
||||
ScopedWorkspaceContextFactory,
|
||||
DatabaseEventTriggerListener,
|
||||
WorkflowEventTriggerJob,
|
||||
],
|
||||
exports: [WorkflowTriggerWorkspaceService],
|
||||
|
||||
@ -1,18 +1,21 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
|
||||
import { EntityManager } from 'typeorm';
|
||||
|
||||
import { buildCreatedByFromWorkspaceMember } from 'src/engine/core-modules/actor/utils/build-created-by-from-workspace-member.util';
|
||||
import { User } from 'src/engine/core-modules/user/user.entity';
|
||||
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
|
||||
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
|
||||
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
|
||||
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
|
||||
import {
|
||||
WorkflowDatabaseEventTrigger,
|
||||
WorkflowTriggerType,
|
||||
} from 'src/modules/workflow/common/types/workflow-trigger.type';
|
||||
WorkflowVersionStatus,
|
||||
WorkflowVersionWorkspaceEntity,
|
||||
} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
|
||||
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
|
||||
import { WorkflowTriggerType } from 'src/modules/workflow/common/types/workflow-trigger.type';
|
||||
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workflow-common.workspace-service';
|
||||
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-runner.workspace-service';
|
||||
import { assertWorkflowVersionIsValid } from 'src/modules/workflow/workflow-trigger/utils/assert-workflow-version-is-valid';
|
||||
import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service';
|
||||
import { assertVersionCanBeActivated } from 'src/modules/workflow/workflow-trigger/utils/assert-version-can-be-activated.util';
|
||||
import {
|
||||
WorkflowTriggerException,
|
||||
WorkflowTriggerExceptionCode,
|
||||
@ -25,6 +28,7 @@ export class WorkflowTriggerWorkspaceService {
|
||||
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
|
||||
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
|
||||
private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
|
||||
private readonly databaseEventTriggerService: DatabaseEventTriggerService,
|
||||
) {}
|
||||
|
||||
async runWorkflowVersion(
|
||||
@ -42,17 +46,9 @@ export class WorkflowTriggerWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
const workflowVersion =
|
||||
await this.workflowCommonWorkspaceService.getWorkflowVersion(
|
||||
workflowVersionId,
|
||||
);
|
||||
|
||||
if (!workflowVersion) {
|
||||
throw new WorkflowTriggerException(
|
||||
'No workflow version found',
|
||||
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_VERSION,
|
||||
);
|
||||
}
|
||||
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
|
||||
workflowVersionId,
|
||||
);
|
||||
|
||||
return await this.workflowRunnerWorkspaceService.run(
|
||||
workspaceId,
|
||||
@ -64,72 +60,178 @@ export class WorkflowTriggerWorkspaceService {
|
||||
|
||||
async enableWorkflowTrigger(workflowVersionId: string) {
|
||||
const workflowVersion =
|
||||
await this.workflowCommonWorkspaceService.getWorkflowVersion(
|
||||
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
|
||||
workflowVersionId,
|
||||
);
|
||||
|
||||
assertWorkflowVersionIsValid(workflowVersion);
|
||||
|
||||
switch (workflowVersion.trigger.type) {
|
||||
case WorkflowTriggerType.DATABASE_EVENT:
|
||||
await this.upsertEventListenerAndPublishVersion(
|
||||
workflowVersion.workflowId,
|
||||
workflowVersionId,
|
||||
workflowVersion.trigger,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private async upsertEventListenerAndPublishVersion(
|
||||
workflowId: string,
|
||||
workflowVersionId: string,
|
||||
trigger: WorkflowDatabaseEventTrigger,
|
||||
) {
|
||||
const eventName = trigger.settings.eventName;
|
||||
|
||||
const workflowEventListenerRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowEventListenerWorkspaceEntity>(
|
||||
'workflowEventListener',
|
||||
);
|
||||
|
||||
const workflowEventListener = await workflowEventListenerRepository.create({
|
||||
workflowId,
|
||||
eventName,
|
||||
});
|
||||
|
||||
const workspaceDataSource = await this.twentyORMManager.getDatasource();
|
||||
|
||||
const workflowRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
|
||||
'workflow',
|
||||
);
|
||||
|
||||
await workspaceDataSource?.transaction(async (transactionManager) => {
|
||||
// TODO: Use upsert when available for workspace entities
|
||||
await workflowEventListenerRepository.delete(
|
||||
{
|
||||
workflowId,
|
||||
eventName,
|
||||
},
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await workflowEventListenerRepository.save(
|
||||
workflowEventListener,
|
||||
{},
|
||||
transactionManager,
|
||||
);
|
||||
|
||||
await workflowRepository.update(
|
||||
{ id: workflowId },
|
||||
{ publishedVersionId: workflowVersionId },
|
||||
transactionManager,
|
||||
);
|
||||
const workflow = await workflowRepository.findOne({
|
||||
where: { id: workflowVersion.workflowId },
|
||||
});
|
||||
|
||||
if (!workflow) {
|
||||
throw new WorkflowTriggerException(
|
||||
'No workflow found',
|
||||
WorkflowTriggerExceptionCode.INVALID_WORKFLOW_VERSION,
|
||||
);
|
||||
}
|
||||
|
||||
assertVersionCanBeActivated(workflowVersion, workflow);
|
||||
|
||||
const workspaceDataSource = await this.twentyORMManager.getDatasource();
|
||||
const queryRunner = workspaceDataSource.createQueryRunner();
|
||||
|
||||
await queryRunner.connect();
|
||||
await queryRunner.startTransaction();
|
||||
|
||||
const manager = queryRunner.manager;
|
||||
|
||||
try {
|
||||
if (
|
||||
workflow.lastPublishedVersionId &&
|
||||
workflowVersionId !== workflow.lastPublishedVersionId
|
||||
) {
|
||||
await this.disableWorkflowTriggerWithManager(
|
||||
workflow.lastPublishedVersionId,
|
||||
manager,
|
||||
);
|
||||
}
|
||||
|
||||
await this.activateWorkflowVersion(
|
||||
workflowVersion.workflowId,
|
||||
workflowVersionId,
|
||||
manager,
|
||||
);
|
||||
await workflowRepository.update(
|
||||
{ id: workflow.id },
|
||||
{ lastPublishedVersionId: workflowVersionId },
|
||||
manager,
|
||||
);
|
||||
|
||||
switch (workflowVersion.trigger.type) {
|
||||
case WorkflowTriggerType.DATABASE_EVENT:
|
||||
await this.databaseEventTriggerService.createEventListener(
|
||||
workflowVersion.workflowId,
|
||||
workflowVersion.trigger,
|
||||
manager,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
await queryRunner.commitTransaction();
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
await queryRunner.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await queryRunner.release();
|
||||
}
|
||||
}
|
||||
|
||||
async disableWorkflowTrigger(workflowVersionId: string) {
|
||||
const workspaceDataSource = await this.twentyORMManager.getDatasource();
|
||||
const queryRunner = workspaceDataSource.createQueryRunner();
|
||||
|
||||
await queryRunner.connect();
|
||||
await queryRunner.startTransaction();
|
||||
|
||||
try {
|
||||
await this.disableWorkflowTriggerWithManager(
|
||||
workflowVersionId,
|
||||
queryRunner.manager,
|
||||
);
|
||||
|
||||
await queryRunner.commitTransaction();
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
await queryRunner.rollbackTransaction();
|
||||
throw error;
|
||||
} finally {
|
||||
await queryRunner.release();
|
||||
}
|
||||
}
|
||||
|
||||
private async disableWorkflowTriggerWithManager(
|
||||
workflowVersionId: string,
|
||||
manager: EntityManager,
|
||||
) {
|
||||
const workflowVersionRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
|
||||
'workflowVersion',
|
||||
);
|
||||
|
||||
const workflowVersion = await workflowVersionRepository.findOne({
|
||||
where: { id: workflowVersionId },
|
||||
});
|
||||
|
||||
if (!workflowVersion) {
|
||||
throw new WorkflowTriggerException(
|
||||
'No workflow version found',
|
||||
WorkflowTriggerExceptionCode.INVALID_INPUT,
|
||||
);
|
||||
}
|
||||
|
||||
if (workflowVersion.status !== WorkflowVersionStatus.ACTIVE) {
|
||||
throw new WorkflowTriggerException(
|
||||
'Cannot disable non-active workflow version',
|
||||
WorkflowTriggerExceptionCode.INVALID_INPUT,
|
||||
);
|
||||
}
|
||||
|
||||
await workflowVersionRepository.update(
|
||||
{ id: workflowVersionId },
|
||||
{ status: WorkflowVersionStatus.DEACTIVATED },
|
||||
manager,
|
||||
);
|
||||
|
||||
switch (workflowVersion?.trigger?.type) {
|
||||
case WorkflowTriggerType.DATABASE_EVENT:
|
||||
await this.databaseEventTriggerService.deleteEventListener(
|
||||
workflowVersion.workflowId,
|
||||
manager,
|
||||
);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
private async activateWorkflowVersion(
|
||||
workflowId: string,
|
||||
workflowVersionId: string,
|
||||
manager: EntityManager,
|
||||
) {
|
||||
const workflowVersionRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowVersionWorkspaceEntity>(
|
||||
'workflowVersion',
|
||||
);
|
||||
|
||||
const activeWorkflowVersions = await workflowVersionRepository.find(
|
||||
{
|
||||
where: { workflowId, status: WorkflowVersionStatus.ACTIVE },
|
||||
},
|
||||
manager,
|
||||
);
|
||||
|
||||
if (activeWorkflowVersions.length > 0) {
|
||||
throw new WorkflowTriggerException(
|
||||
'Cannot have more than one active workflow version',
|
||||
WorkflowTriggerExceptionCode.FORBIDDEN,
|
||||
);
|
||||
}
|
||||
|
||||
await workflowVersionRepository.update(
|
||||
{ id: workflowVersionId },
|
||||
{ status: WorkflowVersionStatus.ACTIVE },
|
||||
manager,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user