Update workflow statuses in real time (#11653)

Statuses are maintained by an async job. Those are calculations that we
would like to avoid using in both frontend and backend. Using push
updates are an easier way.



https://github.com/user-attachments/assets/31e44a82-08a8-4100-a38e-c965d5c73ee8
This commit is contained in:
Thomas Trompette
2025-04-22 11:03:12 +02:00
committed by GitHub
parent 3ce3793129
commit 33e0794da9
7 changed files with 223 additions and 39 deletions

View File

@ -1,9 +1,12 @@
import { useRecordIndexContextOrThrow } from '@/object-record/record-index/contexts/RecordIndexContext';
import { RecordValueSetterEffect } from '@/object-record/record-store/components/RecordValueSetterEffect';
import { RecordTableCellCheckbox } from '@/object-record/record-table/record-table-cell/components/RecordTableCellCheckbox';
import { RecordTableCellGrip } from '@/object-record/record-table/record-table-cell/components/RecordTableCellGrip';
import { RecordTableLastEmptyCell } from '@/object-record/record-table/record-table-cell/components/RecordTableLastEmptyCell';
import { RecordTableCells } from '@/object-record/record-table/record-table-row/components/RecordTableCells';
import { RecordTableDraggableTr } from '@/object-record/record-table/record-table-row/components/RecordTableDraggableTr';
import { ListenRecordUpdatesEffect } from '@/subscription/components/ListenUpdatesEffect';
import { getDefaultRecordFieldsToListen } from '@/subscription/utils/getDefaultRecordFieldsToListen.util';
type RecordTableRowProps = {
recordId: string;
@ -16,6 +19,11 @@ export const RecordTableRow = ({
rowIndexForFocus,
rowIndexForDrag,
}: RecordTableRowProps) => {
const { objectNameSingular } = useRecordIndexContextOrThrow();
const listenedFields = getDefaultRecordFieldsToListen({
objectNameSingular,
});
return (
<RecordTableDraggableTr
recordId={recordId}
@ -27,6 +35,11 @@ export const RecordTableRow = ({
<RecordTableCells />
<RecordTableLastEmptyCell />
<RecordValueSetterEffect recordId={recordId} />
<ListenRecordUpdatesEffect
objectNameSingular={objectNameSingular}
recordId={recordId}
listenedFields={listenedFields}
/>
</RecordTableDraggableTr>
);
};

View File

@ -1,7 +1,7 @@
import { useApolloClient } from '@apollo/client';
import { useOnDbEvent } from '@/subscription/hooks/useOnDbEvent';
import { DatabaseEventAction } from '~/generated/graphql';
import { useApolloClient } from '@apollo/client';
import { capitalize, isDefined } from 'twenty-shared/utils';
import { DatabaseEventAction } from '~/generated/graphql';
type ListenRecordUpdatesEffectProps = {
objectNameSingular: string;
@ -39,6 +39,7 @@ export const ListenRecordUpdatesEffect = ({
fields: fieldsUpdater,
});
},
skip: listenedFields.length === 0,
});
return null;

View File

@ -0,0 +1,14 @@
import { CoreObjectNameSingular } from '@/object-metadata/types/CoreObjectNameSingular';
export const getDefaultRecordFieldsToListen = ({
objectNameSingular,
}: {
objectNameSingular: string;
}) => {
switch (objectNameSingular) {
case CoreObjectNameSingular.Workflow:
return ['statuses'];
default:
return [];
}
};

View File

@ -1,13 +1,13 @@
import { Inject } from '@nestjs/common';
import { isDefined } from 'twenty-shared/utils';
import { RedisPubSub } from 'graphql-redis-subscriptions';
import { isDefined } from 'twenty-shared/utils';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { ObjectRecordEvent } from 'src/engine/core-modules/event-emitter/types/object-record-event.event';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { removeSecretFromWebhookRecord } from 'src/utils/remove-secret-from-webhook-record';
@Processor(MessageQueue.subscriptionsQueue)

View File

@ -1,6 +1,10 @@
import { Test, TestingModule } from '@nestjs/testing';
import { getRepositoryToken } from '@nestjs/typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
@ -8,7 +12,6 @@ import {
WorkflowVersionBatchEvent,
WorkflowVersionEventType,
} from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
describe('WorkflowStatusesUpdate', () => {
let job: WorkflowStatusesUpdateJob;
@ -26,6 +29,10 @@ describe('WorkflowStatusesUpdate', () => {
publishOneServerlessFunction: jest.fn(),
};
const mockWorkspaceEventEmitter = {
emitDatabaseBatchEvent: jest.fn(),
};
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
@ -38,6 +45,18 @@ describe('WorkflowStatusesUpdate', () => {
provide: ServerlessFunctionService,
useValue: mockServerlessFunctionService,
},
{
provide: WorkspaceEventEmitter,
useValue: mockWorkspaceEventEmitter,
},
{
provide: getRepositoryToken(ObjectMetadataEntity, 'metadata'),
useValue: {
findOneOrFail: jest.fn().mockResolvedValue({
nameSingular: 'workflow',
}),
},
},
],
}).compile();
@ -69,6 +88,9 @@ describe('WorkflowStatusesUpdate', () => {
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1);
expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(0);
});
it('when no draft yet, update statuses', async () => {
@ -92,6 +114,9 @@ describe('WorkflowStatusesUpdate', () => {
{ id: '1' },
{ statuses: [WorkflowStatus.ACTIVE, WorkflowStatus.DRAFT] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(1);
});
});
@ -120,6 +145,9 @@ describe('WorkflowStatusesUpdate', () => {
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(0);
});
test('when update that should be impossible, do not do anything', async () => {
@ -146,6 +174,9 @@ describe('WorkflowStatusesUpdate', () => {
expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2);
expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(0);
});
test('when WorkflowVersionStatus.DEACTIVATED to WorkflowVersionStatus.ACTIVE, should activate', async () => {
@ -175,6 +206,9 @@ describe('WorkflowStatusesUpdate', () => {
{ id: '1' },
{ statuses: [WorkflowStatus.ACTIVE] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(1);
});
test('when WorkflowVersionStatus.ACTIVE to WorkflowVersionStatus.DEACTIVATED, should deactivate', async () => {
@ -204,6 +238,9 @@ describe('WorkflowStatusesUpdate', () => {
{ id: '1' },
{ statuses: [WorkflowStatus.DEACTIVATED] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(1);
});
test('when WorkflowVersionStatus.DRAFT to WorkflowVersionStatus.ACTIVE, should activate', async () => {
@ -233,6 +270,9 @@ describe('WorkflowStatusesUpdate', () => {
{ id: '1' },
{ statuses: [WorkflowStatus.ACTIVE] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(1);
});
});
@ -276,6 +316,9 @@ describe('WorkflowStatusesUpdate', () => {
{ id: '1' },
{ statuses: [] },
);
expect(
mockWorkspaceEventEmitter.emitDatabaseBatchEvent,
).toHaveBeenCalledTimes(1);
});
});
});

View File

@ -1,18 +1,27 @@
import { Logger, Scope } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { isDefined } from 'twenty-shared/utils';
import { Repository } from 'typeorm';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ServerlessFunctionExceptionCode } from 'src/engine/metadata-modules/serverless-function/serverless-function.exception';
import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service';
import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import {
WorkflowVersionStatus,
WorkflowVersionWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity';
import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
WorkflowStatus,
WorkflowWorkspaceEntity,
} from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity';
import {
WorkflowAction,
WorkflowActionType,
@ -20,7 +29,6 @@ import {
import { getStatusCombinationFromArray } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util';
import { getStatusCombinationFromUpdate } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util';
import { getWorkflowStatusesFromCombination } from 'src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util';
import { ServerlessFunctionExceptionCode } from 'src/engine/metadata-modules/serverless-function/serverless-function.exception';
export enum WorkflowVersionEventType {
CREATE = 'CREATE',
@ -66,32 +74,51 @@ export class WorkflowStatusesUpdateJob {
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly serverlessFunctionService: ServerlessFunctionService,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
@InjectRepository(ObjectMetadataEntity, 'metadata')
protected readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
) {}
@Process(WorkflowStatusesUpdateJob.name)
async handle(event: WorkflowVersionBatchEvent): Promise<void> {
const workflowObjectMetadata =
await this.objectMetadataRepository.findOneOrFail({
where: {
nameSingular: 'workflow',
},
});
switch (event.type) {
case WorkflowVersionEventType.CREATE:
await Promise.all(
event.workflowIds.map((workflowId) =>
this.handleWorkflowVersionCreated(workflowId),
this.handleWorkflowVersionCreated({
workflowId,
workflowObjectMetadata,
workspaceId: event.workspaceId,
}),
),
);
break;
case WorkflowVersionEventType.STATUS_UPDATE:
await Promise.all(
event.statusUpdates.map((statusUpdate) =>
this.handleWorkflowVersionStatusUpdated(
this.handleWorkflowVersionStatusUpdated({
statusUpdate,
event.workspaceId,
),
workflowObjectMetadata,
workspaceId: event.workspaceId,
}),
),
);
break;
case WorkflowVersionEventType.DELETE:
await Promise.all(
event.workflowIds.map((workflowId) =>
this.handleWorkflowVersionDeleted(workflowId),
this.handleWorkflowVersionDeleted({
workflowId,
workflowObjectMetadata,
workspaceId: event.workspaceId,
}),
),
);
break;
@ -100,22 +127,28 @@ export class WorkflowStatusesUpdateJob {
}
}
private async handleWorkflowVersionCreated(
workflowId: string,
): Promise<void> {
private async handleWorkflowVersionCreated({
workflowId,
workflowObjectMetadata,
workspaceId,
}: {
workflowId: string;
workflowObjectMetadata: ObjectMetadataEntity;
workspaceId: string;
}): Promise<void> {
const workflowRepository =
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
'workflow',
);
const workflow = await workflowRepository.findOneOrFail({
const previousWorkflow = await workflowRepository.findOneOrFail({
where: {
id: workflowId,
},
});
const currentWorkflowStatusCombination = getStatusCombinationFromArray(
workflow.statuses || [],
previousWorkflow.statuses || [],
);
const newWorkflowStatusCombination = getStatusCombinationFromUpdate(
@ -128,16 +161,25 @@ export class WorkflowStatusesUpdateJob {
return;
}
const newWorkflowStatuses = getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
);
await workflowRepository.update(
{
id: workflow.id,
id: workflowId,
},
{
statuses: getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
),
statuses: newWorkflowStatuses,
},
);
this.emitWorkflowStatusUpdatedEvent({
currentWorkflow: previousWorkflow,
workflowObjectMetadata,
newWorkflowStatuses,
workspaceId,
});
}
private async handlePublishServerlessFunction({
@ -207,10 +249,15 @@ export class WorkflowStatusesUpdateJob {
}
}
private async handleWorkflowVersionStatusUpdated(
statusUpdate: WorkflowVersionStatusUpdate,
workspaceId: string,
): Promise<void> {
private async handleWorkflowVersionStatusUpdated({
statusUpdate,
workflowObjectMetadata,
workspaceId,
}: {
statusUpdate: WorkflowVersionStatusUpdate;
workflowObjectMetadata: ObjectMetadataEntity;
workspaceId: string;
}): Promise<void> {
const workflowRepository =
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
'workflow',
@ -252,21 +299,36 @@ export class WorkflowStatusesUpdateJob {
return;
}
const newWorkflowStatuses = getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
);
await workflowRepository.update(
{
id: statusUpdate.workflowId,
},
{
statuses: getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
),
statuses: newWorkflowStatuses,
},
);
this.emitWorkflowStatusUpdatedEvent({
currentWorkflow: workflow,
workflowObjectMetadata,
newWorkflowStatuses,
workspaceId,
});
}
private async handleWorkflowVersionDeleted(
workflowId: string,
): Promise<void> {
private async handleWorkflowVersionDeleted({
workflowId,
workflowObjectMetadata,
workspaceId,
}: {
workflowId: string;
workflowObjectMetadata: ObjectMetadataEntity;
workspaceId: string;
}): Promise<void> {
const workflowRepository =
await this.twentyORMManager.getRepository<WorkflowWorkspaceEntity>(
'workflow',
@ -292,15 +354,59 @@ export class WorkflowStatusesUpdateJob {
return;
}
const newWorkflowStatuses = getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
);
await workflowRepository.update(
{
id: workflowId,
},
{
statuses: getWorkflowStatusesFromCombination(
newWorkflowStatusCombination,
),
statuses: newWorkflowStatuses,
},
);
this.emitWorkflowStatusUpdatedEvent({
currentWorkflow: workflow,
workflowObjectMetadata,
newWorkflowStatuses,
workspaceId,
});
}
private emitWorkflowStatusUpdatedEvent({
currentWorkflow,
workflowObjectMetadata,
newWorkflowStatuses,
workspaceId,
}: {
currentWorkflow: WorkflowWorkspaceEntity;
workflowObjectMetadata: ObjectMetadataEntity;
newWorkflowStatuses: WorkflowStatus[];
workspaceId: string;
}) {
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: workflowObjectMetadata.nameSingular,
action: DatabaseEventAction.UPDATED,
events: [
{
recordId: currentWorkflow.id,
objectMetadata: workflowObjectMetadata,
properties: {
before: currentWorkflow,
after: {
...currentWorkflow,
statuses: newWorkflowStatuses,
},
updatedFields: ['statuses'],
diff: {
statuses: newWorkflowStatuses,
},
},
},
],
workspaceId,
});
}
}

View File

@ -1,11 +1,18 @@
import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module';
import { WorkflowStatusesUpdateJob } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
import { WorkflowVersionStatusListener } from 'src/modules/workflow/workflow-status/listeners/workflow-version-status.listener';
import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module';
@Module({
imports: [ServerlessFunctionModule],
imports: [
ServerlessFunctionModule,
WorkspaceEventEmitterModule,
TypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
],
providers: [WorkflowStatusesUpdateJob, WorkflowVersionStatusListener],
})
export class WorkflowStatusModule {}