bugfix: escape destroyed objects on workers (#9719)

# This PR

- Fixes #9358 

@FelixMalfait please check this workaround

---------

Co-authored-by: Félix Malfait <felix@twenty.com>
This commit is contained in:
P A C - 先生
2025-01-23 17:29:54 +02:00
committed by GitHub
parent bbb0c9a761
commit bbd3af108b
11 changed files with 134 additions and 80 deletions

View File

@ -1,17 +1,21 @@
import { Injectable } from '@nestjs/common';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { ObjectRecordDestroyEvent } from 'src/engine/core-modules/event-emitter/types/object-record-destroy.event';
import { ObjectRecordEvent } from 'src/engine/core-modules/event-emitter/types/object-record-event.event';
import { ObjectRecordNonDestructiveEvent } from 'src/engine/core-modules/event-emitter/types/object-record-non-destructive-event';
import { ObjectRecordRestoreEvent } from 'src/engine/core-modules/event-emitter/types/object-record-restore.event';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { CallWebhookJobsJob } from 'src/modules/webhook/jobs/call-webhook-jobs.job';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
export class EntityEventsToDbListener {
@ -24,47 +28,66 @@ export class EntityEventsToDbListener {
@OnDatabaseBatchEvent('*', DatabaseEventAction.CREATED)
async handleCreate(batchEvent: WorkspaceEventBatch<ObjectRecordCreateEvent>) {
return this.handle(batchEvent);
return this.handleEvent(batchEvent, DatabaseEventAction.CREATED);
}
@OnDatabaseBatchEvent('*', DatabaseEventAction.UPDATED)
async handleUpdate(batchEvent: WorkspaceEventBatch<ObjectRecordUpdateEvent>) {
return this.handle(batchEvent);
return this.handleEvent(batchEvent, DatabaseEventAction.UPDATED);
}
@OnDatabaseBatchEvent('*', DatabaseEventAction.DELETED)
async handleDelete(batchEvent: WorkspaceEventBatch<ObjectRecordUpdateEvent>) {
return this.handle(batchEvent);
async handleDelete(batchEvent: WorkspaceEventBatch<ObjectRecordDeleteEvent>) {
return this.handleEvent(batchEvent, DatabaseEventAction.DELETED);
}
@OnDatabaseBatchEvent('*', DatabaseEventAction.RESTORED)
async handleRestore(
batchEvent: WorkspaceEventBatch<ObjectRecordRestoreEvent>,
) {
return this.handleEvent(batchEvent, DatabaseEventAction.RESTORED);
}
@OnDatabaseBatchEvent('*', DatabaseEventAction.DESTROYED)
async handleDestroy(
batchEvent: WorkspaceEventBatch<ObjectRecordUpdateEvent>,
batchEvent: WorkspaceEventBatch<ObjectRecordDestroyEvent>,
) {
return this.handle(batchEvent);
return this.handleEvent(batchEvent, DatabaseEventAction.DESTROYED);
}
private async handle(batchEvent: WorkspaceEventBatch<ObjectRecordBaseEvent>) {
private async handleEvent<T extends ObjectRecordEvent>(
batchEvent: WorkspaceEventBatch<T>,
action: DatabaseEventAction,
) {
const filteredEvents = batchEvent.events.filter(
(event) => event.objectMetadata?.isAuditLogged,
);
await this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(CreateAuditLogFromInternalEvent.name, {
...batchEvent,
events: filteredEvents,
});
await this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(UpsertTimelineActivityFromInternalEvent.name, {
...batchEvent,
events: filteredEvents,
});
await this.webhookQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(CallWebhookJobsJob.name, batchEvent, { retryLimit: 3 });
await Promise.all([
this.webhookQueueService.add<WorkspaceEventBatch<T>>(
CallWebhookJobsJob.name,
batchEvent,
{
retryLimit: 3,
},
),
this.entityEventsToDbQueueService.add<WorkspaceEventBatch<T>>(
CreateAuditLogFromInternalEvent.name,
{
...batchEvent,
events: filteredEvents,
},
),
...(action !== DatabaseEventAction.DESTROYED
? [
this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordNonDestructiveEvent>
>(UpsertTimelineActivityFromInternalEvent.name, {
...batchEvent,
events: filteredEvents,
}),
]
: []),
]);
}
}