diff --git a/packages/twenty-front/src/modules/object-record/record-table/record-table-row/components/RecordTableRow.tsx b/packages/twenty-front/src/modules/object-record/record-table/record-table-row/components/RecordTableRow.tsx index 849d4af58..b5acdedf9 100644 --- a/packages/twenty-front/src/modules/object-record/record-table/record-table-row/components/RecordTableRow.tsx +++ b/packages/twenty-front/src/modules/object-record/record-table/record-table-row/components/RecordTableRow.tsx @@ -1,9 +1,12 @@ +import { useRecordIndexContextOrThrow } from '@/object-record/record-index/contexts/RecordIndexContext'; import { RecordValueSetterEffect } from '@/object-record/record-store/components/RecordValueSetterEffect'; import { RecordTableCellCheckbox } from '@/object-record/record-table/record-table-cell/components/RecordTableCellCheckbox'; import { RecordTableCellGrip } from '@/object-record/record-table/record-table-cell/components/RecordTableCellGrip'; import { RecordTableLastEmptyCell } from '@/object-record/record-table/record-table-cell/components/RecordTableLastEmptyCell'; import { RecordTableCells } from '@/object-record/record-table/record-table-row/components/RecordTableCells'; import { RecordTableDraggableTr } from '@/object-record/record-table/record-table-row/components/RecordTableDraggableTr'; +import { ListenRecordUpdatesEffect } from '@/subscription/components/ListenUpdatesEffect'; +import { getDefaultRecordFieldsToListen } from '@/subscription/utils/getDefaultRecordFieldsToListen.util'; type RecordTableRowProps = { recordId: string; @@ -16,6 +19,11 @@ export const RecordTableRow = ({ rowIndexForFocus, rowIndexForDrag, }: RecordTableRowProps) => { + const { objectNameSingular } = useRecordIndexContextOrThrow(); + const listenedFields = getDefaultRecordFieldsToListen({ + objectNameSingular, + }); + return ( + ); }; diff --git a/packages/twenty-front/src/modules/subscription/components/ListenUpdatesEffect.tsx b/packages/twenty-front/src/modules/subscription/components/ListenUpdatesEffect.tsx index f740a04f8..2d70e8c4e 100644 --- a/packages/twenty-front/src/modules/subscription/components/ListenUpdatesEffect.tsx +++ b/packages/twenty-front/src/modules/subscription/components/ListenUpdatesEffect.tsx @@ -1,7 +1,7 @@ -import { useApolloClient } from '@apollo/client'; import { useOnDbEvent } from '@/subscription/hooks/useOnDbEvent'; -import { DatabaseEventAction } from '~/generated/graphql'; +import { useApolloClient } from '@apollo/client'; import { capitalize, isDefined } from 'twenty-shared/utils'; +import { DatabaseEventAction } from '~/generated/graphql'; type ListenRecordUpdatesEffectProps = { objectNameSingular: string; @@ -39,6 +39,7 @@ export const ListenRecordUpdatesEffect = ({ fields: fieldsUpdater, }); }, + skip: listenedFields.length === 0, }); return null; diff --git a/packages/twenty-front/src/modules/subscription/utils/getDefaultRecordFieldsToListen.util.ts b/packages/twenty-front/src/modules/subscription/utils/getDefaultRecordFieldsToListen.util.ts new file mode 100644 index 000000000..dd119e69a --- /dev/null +++ b/packages/twenty-front/src/modules/subscription/utils/getDefaultRecordFieldsToListen.util.ts @@ -0,0 +1,14 @@ +import { CoreObjectNameSingular } from '@/object-metadata/types/CoreObjectNameSingular'; + +export const getDefaultRecordFieldsToListen = ({ + objectNameSingular, +}: { + objectNameSingular: string; +}) => { + switch (objectNameSingular) { + case CoreObjectNameSingular.Workflow: + return ['statuses']; + default: + return []; + } +}; diff --git a/packages/twenty-server/src/engine/subscriptions/subscriptions.job.ts b/packages/twenty-server/src/engine/subscriptions/subscriptions.job.ts index 5a88b2404..402bf93dc 100644 --- a/packages/twenty-server/src/engine/subscriptions/subscriptions.job.ts +++ b/packages/twenty-server/src/engine/subscriptions/subscriptions.job.ts @@ -1,13 +1,13 @@ import { Inject } from '@nestjs/common'; -import { isDefined } from 'twenty-shared/utils'; import { RedisPubSub } from 'graphql-redis-subscriptions'; +import { isDefined } from 'twenty-shared/utils'; -import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; -import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; -import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; -import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type'; import { ObjectRecordEvent } from 'src/engine/core-modules/event-emitter/types/object-record-event.event'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type'; import { removeSecretFromWebhookRecord } from 'src/utils/remove-secret-from-webhook-record'; @Processor(MessageQueue.subscriptionsQueue) diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts index 5db63d40c..9a4e34a1f 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/__tests__/workflow-statuses-update.job.spec.ts @@ -1,6 +1,10 @@ import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { WorkflowVersionStatus } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; import { WorkflowStatus } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { @@ -8,7 +12,6 @@ import { WorkflowVersionBatchEvent, WorkflowVersionEventType, } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; -import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; describe('WorkflowStatusesUpdate', () => { let job: WorkflowStatusesUpdateJob; @@ -26,6 +29,10 @@ describe('WorkflowStatusesUpdate', () => { publishOneServerlessFunction: jest.fn(), }; + const mockWorkspaceEventEmitter = { + emitDatabaseBatchEvent: jest.fn(), + }; + beforeEach(async () => { const module: TestingModule = await Test.createTestingModule({ providers: [ @@ -38,6 +45,18 @@ describe('WorkflowStatusesUpdate', () => { provide: ServerlessFunctionService, useValue: mockServerlessFunctionService, }, + { + provide: WorkspaceEventEmitter, + useValue: mockWorkspaceEventEmitter, + }, + { + provide: getRepositoryToken(ObjectMetadataEntity, 'metadata'), + useValue: { + findOneOrFail: jest.fn().mockResolvedValue({ + nameSingular: 'workflow', + }), + }, + }, ], }).compile(); @@ -69,6 +88,9 @@ describe('WorkflowStatusesUpdate', () => { expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(1); expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(0); }); it('when no draft yet, update statuses', async () => { @@ -92,6 +114,9 @@ describe('WorkflowStatusesUpdate', () => { { id: '1' }, { statuses: [WorkflowStatus.ACTIVE, WorkflowStatus.DRAFT] }, ); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(1); }); }); @@ -120,6 +145,9 @@ describe('WorkflowStatusesUpdate', () => { expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2); expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(0); }); test('when update that should be impossible, do not do anything', async () => { @@ -146,6 +174,9 @@ describe('WorkflowStatusesUpdate', () => { expect(mockWorkflowRepository.findOneOrFail).toHaveBeenCalledTimes(2); expect(mockWorkflowRepository.update).toHaveBeenCalledTimes(0); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(0); }); test('when WorkflowVersionStatus.DEACTIVATED to WorkflowVersionStatus.ACTIVE, should activate', async () => { @@ -175,6 +206,9 @@ describe('WorkflowStatusesUpdate', () => { { id: '1' }, { statuses: [WorkflowStatus.ACTIVE] }, ); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(1); }); test('when WorkflowVersionStatus.ACTIVE to WorkflowVersionStatus.DEACTIVATED, should deactivate', async () => { @@ -204,6 +238,9 @@ describe('WorkflowStatusesUpdate', () => { { id: '1' }, { statuses: [WorkflowStatus.DEACTIVATED] }, ); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(1); }); test('when WorkflowVersionStatus.DRAFT to WorkflowVersionStatus.ACTIVE, should activate', async () => { @@ -233,6 +270,9 @@ describe('WorkflowStatusesUpdate', () => { { id: '1' }, { statuses: [WorkflowStatus.ACTIVE] }, ); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(1); }); }); @@ -276,6 +316,9 @@ describe('WorkflowStatusesUpdate', () => { { id: '1' }, { statuses: [] }, ); + expect( + mockWorkspaceEventEmitter.emitDatabaseBatchEvent, + ).toHaveBeenCalledTimes(1); }); }); }); diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts index a63f33249..820d6cf16 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job.ts @@ -1,18 +1,27 @@ import { Logger, Scope } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; import { isDefined } from 'twenty-shared/utils'; +import { Repository } from 'typeorm'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { ServerlessFunctionExceptionCode } from 'src/engine/metadata-modules/serverless-function/serverless-function.exception'; import { ServerlessFunctionService } from 'src/engine/metadata-modules/serverless-function/serverless-function.service'; import { WorkspaceRepository } from 'src/engine/twenty-orm/repository/workspace.repository'; import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { WorkflowVersionStatus, WorkflowVersionWorkspaceEntity, } from 'src/modules/workflow/common/standard-objects/workflow-version.workspace-entity'; -import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; +import { + WorkflowStatus, + WorkflowWorkspaceEntity, +} from 'src/modules/workflow/common/standard-objects/workflow.workspace-entity'; import { WorkflowAction, WorkflowActionType, @@ -20,7 +29,6 @@ import { import { getStatusCombinationFromArray } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-array.util'; import { getStatusCombinationFromUpdate } from 'src/modules/workflow/workflow-status/utils/get-status-combination-from-update.util'; import { getWorkflowStatusesFromCombination } from 'src/modules/workflow/workflow-status/utils/get-statuses-from-combination.util'; -import { ServerlessFunctionExceptionCode } from 'src/engine/metadata-modules/serverless-function/serverless-function.exception'; export enum WorkflowVersionEventType { CREATE = 'CREATE', @@ -66,32 +74,51 @@ export class WorkflowStatusesUpdateJob { constructor( private readonly twentyORMManager: TwentyORMManager, private readonly serverlessFunctionService: ServerlessFunctionService, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, + @InjectRepository(ObjectMetadataEntity, 'metadata') + protected readonly objectMetadataRepository: Repository, ) {} @Process(WorkflowStatusesUpdateJob.name) async handle(event: WorkflowVersionBatchEvent): Promise { + const workflowObjectMetadata = + await this.objectMetadataRepository.findOneOrFail({ + where: { + nameSingular: 'workflow', + }, + }); + switch (event.type) { case WorkflowVersionEventType.CREATE: await Promise.all( event.workflowIds.map((workflowId) => - this.handleWorkflowVersionCreated(workflowId), + this.handleWorkflowVersionCreated({ + workflowId, + workflowObjectMetadata, + workspaceId: event.workspaceId, + }), ), ); break; case WorkflowVersionEventType.STATUS_UPDATE: await Promise.all( event.statusUpdates.map((statusUpdate) => - this.handleWorkflowVersionStatusUpdated( + this.handleWorkflowVersionStatusUpdated({ statusUpdate, - event.workspaceId, - ), + workflowObjectMetadata, + workspaceId: event.workspaceId, + }), ), ); break; case WorkflowVersionEventType.DELETE: await Promise.all( event.workflowIds.map((workflowId) => - this.handleWorkflowVersionDeleted(workflowId), + this.handleWorkflowVersionDeleted({ + workflowId, + workflowObjectMetadata, + workspaceId: event.workspaceId, + }), ), ); break; @@ -100,22 +127,28 @@ export class WorkflowStatusesUpdateJob { } } - private async handleWorkflowVersionCreated( - workflowId: string, - ): Promise { + private async handleWorkflowVersionCreated({ + workflowId, + workflowObjectMetadata, + workspaceId, + }: { + workflowId: string; + workflowObjectMetadata: ObjectMetadataEntity; + workspaceId: string; + }): Promise { const workflowRepository = await this.twentyORMManager.getRepository( 'workflow', ); - const workflow = await workflowRepository.findOneOrFail({ + const previousWorkflow = await workflowRepository.findOneOrFail({ where: { id: workflowId, }, }); const currentWorkflowStatusCombination = getStatusCombinationFromArray( - workflow.statuses || [], + previousWorkflow.statuses || [], ); const newWorkflowStatusCombination = getStatusCombinationFromUpdate( @@ -128,16 +161,25 @@ export class WorkflowStatusesUpdateJob { return; } + const newWorkflowStatuses = getWorkflowStatusesFromCombination( + newWorkflowStatusCombination, + ); + await workflowRepository.update( { - id: workflow.id, + id: workflowId, }, { - statuses: getWorkflowStatusesFromCombination( - newWorkflowStatusCombination, - ), + statuses: newWorkflowStatuses, }, ); + + this.emitWorkflowStatusUpdatedEvent({ + currentWorkflow: previousWorkflow, + workflowObjectMetadata, + newWorkflowStatuses, + workspaceId, + }); } private async handlePublishServerlessFunction({ @@ -207,10 +249,15 @@ export class WorkflowStatusesUpdateJob { } } - private async handleWorkflowVersionStatusUpdated( - statusUpdate: WorkflowVersionStatusUpdate, - workspaceId: string, - ): Promise { + private async handleWorkflowVersionStatusUpdated({ + statusUpdate, + workflowObjectMetadata, + workspaceId, + }: { + statusUpdate: WorkflowVersionStatusUpdate; + workflowObjectMetadata: ObjectMetadataEntity; + workspaceId: string; + }): Promise { const workflowRepository = await this.twentyORMManager.getRepository( 'workflow', @@ -252,21 +299,36 @@ export class WorkflowStatusesUpdateJob { return; } + const newWorkflowStatuses = getWorkflowStatusesFromCombination( + newWorkflowStatusCombination, + ); + await workflowRepository.update( { id: statusUpdate.workflowId, }, { - statuses: getWorkflowStatusesFromCombination( - newWorkflowStatusCombination, - ), + statuses: newWorkflowStatuses, }, ); + + this.emitWorkflowStatusUpdatedEvent({ + currentWorkflow: workflow, + workflowObjectMetadata, + newWorkflowStatuses, + workspaceId, + }); } - private async handleWorkflowVersionDeleted( - workflowId: string, - ): Promise { + private async handleWorkflowVersionDeleted({ + workflowId, + workflowObjectMetadata, + workspaceId, + }: { + workflowId: string; + workflowObjectMetadata: ObjectMetadataEntity; + workspaceId: string; + }): Promise { const workflowRepository = await this.twentyORMManager.getRepository( 'workflow', @@ -292,15 +354,59 @@ export class WorkflowStatusesUpdateJob { return; } + const newWorkflowStatuses = getWorkflowStatusesFromCombination( + newWorkflowStatusCombination, + ); + await workflowRepository.update( { id: workflowId, }, { - statuses: getWorkflowStatusesFromCombination( - newWorkflowStatusCombination, - ), + statuses: newWorkflowStatuses, }, ); + + this.emitWorkflowStatusUpdatedEvent({ + currentWorkflow: workflow, + workflowObjectMetadata, + newWorkflowStatuses, + workspaceId, + }); + } + + private emitWorkflowStatusUpdatedEvent({ + currentWorkflow, + workflowObjectMetadata, + newWorkflowStatuses, + workspaceId, + }: { + currentWorkflow: WorkflowWorkspaceEntity; + workflowObjectMetadata: ObjectMetadataEntity; + newWorkflowStatuses: WorkflowStatus[]; + workspaceId: string; + }) { + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: workflowObjectMetadata.nameSingular, + action: DatabaseEventAction.UPDATED, + events: [ + { + recordId: currentWorkflow.id, + objectMetadata: workflowObjectMetadata, + properties: { + before: currentWorkflow, + after: { + ...currentWorkflow, + statuses: newWorkflowStatuses, + }, + updatedFields: ['statuses'], + diff: { + statuses: newWorkflowStatuses, + }, + }, + }, + ], + workspaceId, + }); } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts index f7a4a841e..ba54f1ef3 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-status/workflow-status.module.ts @@ -1,11 +1,18 @@ import { Module } from '@nestjs/common'; +import { TypeOrmModule } from '@nestjs/typeorm'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module'; +import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module'; import { WorkflowStatusesUpdateJob } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job'; import { WorkflowVersionStatusListener } from 'src/modules/workflow/workflow-status/listeners/workflow-version-status.listener'; -import { ServerlessFunctionModule } from 'src/engine/metadata-modules/serverless-function/serverless-function.module'; @Module({ - imports: [ServerlessFunctionModule], + imports: [ + ServerlessFunctionModule, + WorkspaceEventEmitterModule, + TypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), + ], providers: [WorkflowStatusesUpdateJob, WorkflowVersionStatusListener], }) export class WorkflowStatusModule {}