6619 modify event emitter to emit an array of events (#6625)

Closes #6619

---------

Co-authored-by: Charles Bochet <charles@twenty.com>
This commit is contained in:
Raphaël Bosi
2024-08-20 19:44:29 +02:00
committed by GitHub
parent 17a1760afd
commit 091c0f83be
41 changed files with 1005 additions and 722 deletions

View File

@ -1,12 +1,13 @@
import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository';
import { AuditLogWorkspaceEntity } from 'src/modules/timeline/standard-objects/audit-log.workspace-entity';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { Process } from 'src/engine/integrations/message-queue/decorators/process.decorator';
@Processor(MessageQueue.entityEventsToDbQueue)
export class CreateAuditLogFromInternalEvent {
@ -18,33 +19,37 @@ export class CreateAuditLogFromInternalEvent {
) {}
@Process(CreateAuditLogFromInternalEvent.name)
async handle(data: ObjectRecordBaseEvent): Promise<void> {
let workspaceMemberId: string | null = null;
async handle(
data: WorkspaceEventBatch<ObjectRecordBaseEvent>,
): Promise<void> {
for (const eventData of data.events) {
let workspaceMemberId: string | null = null;
if (data.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
data.userId,
if (eventData.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
eventData.userId,
data.workspaceId,
);
workspaceMemberId = workspaceMember.id;
}
if (eventData.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
eventData.properties = {
diff: eventData.properties.diff,
};
}
await this.auditLogRepository.insert(
data.name,
eventData.properties,
workspaceMemberId,
data.name.split('.')[0],
eventData.objectMetadata.id,
eventData.recordId,
data.workspaceId,
);
workspaceMemberId = workspaceMember.id;
}
if (data.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
data.properties = {
diff: data.properties.diff,
};
}
await this.auditLogRepository.insert(
data.name,
data.properties,
workspaceMemberId,
data.name.split('.')[0],
data.objectMetadata.id,
data.recordId,
data.workspaceId,
);
}
}

View File

@ -3,6 +3,7 @@ import { Process } from 'src/engine/integrations/message-queue/decorators/proces
import { Processor } from 'src/engine/integrations/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/integrations/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { TimelineActivityService } from 'src/modules/timeline/services/timeline-activity.service';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
@ -16,33 +17,41 @@ export class UpsertTimelineActivityFromInternalEvent {
) {}
@Process(UpsertTimelineActivityFromInternalEvent.name)
async handle(data: ObjectRecordBaseEvent): Promise<void> {
if (data.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
data.userId,
data.workspaceId,
);
async handle(
data: WorkspaceEventBatch<ObjectRecordBaseEvent>,
): Promise<void> {
for (const eventData of data.events) {
if (eventData.userId) {
const workspaceMember = await this.workspaceMemberService.getByIdOrFail(
eventData.userId,
data.workspaceId,
);
data.workspaceMemberId = workspaceMember.id;
eventData.workspaceMemberId = workspaceMember.id;
}
if (eventData.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
eventData.properties = {
diff: eventData.properties.diff,
};
}
// Temporary
// We ignore every that is not a LinkedObject or a Business Object
if (
eventData.objectMetadata.isSystem &&
eventData.objectMetadata.nameSingular !== 'noteTarget' &&
eventData.objectMetadata.nameSingular !== 'taskTarget'
) {
continue;
}
await this.timelineActivityService.upsertEvent({
...eventData,
workspaceId: data.workspaceId,
name: data.name,
});
}
if (data.properties.diff) {
// we remove "before" and "after" property for a cleaner/slimmer event payload
data.properties = {
diff: data.properties.diff,
};
}
// Temporary
// We ignore every that is not a LinkedObject or a Business Object
if (
data.objectMetadata.isSystem &&
data.objectMetadata.nameSingular !== 'noteTarget' &&
data.objectMetadata.nameSingular !== 'taskTarget'
) {
return;
}
await this.timelineActivityService.upsertEvent(data);
}
}

View File

@ -1,12 +1,12 @@
import { Injectable } from '@nestjs/common';
import { ObjectRecordBaseEvent } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { ObjectRecordBaseEventWithNameAndWorkspaceId } from 'src/engine/integrations/event-emitter/types/object-record.base.event';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
type TransformedEvent = ObjectRecordBaseEvent & {
type TransformedEvent = ObjectRecordBaseEventWithNameAndWorkspaceId & {
objectName?: string;
linkedRecordCachedName?: string;
linkedRecordId?: string;
@ -26,7 +26,7 @@ export class TimelineActivityService {
task: 'taskTarget',
};
async upsertEvent(event: ObjectRecordBaseEvent) {
async upsertEvent(event: ObjectRecordBaseEventWithNameAndWorkspaceId) {
const events = await this.transformEvent(event);
if (!events || events.length === 0) return;
@ -47,7 +47,7 @@ export class TimelineActivityService {
}
private async transformEvent(
event: ObjectRecordBaseEvent,
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
): Promise<TransformedEvent[]> {
if (['note', 'task'].includes(event.objectMetadata.nameSingular)) {
const linkedObjects = await this.handleLinkedObjects(event);
@ -69,7 +69,9 @@ export class TimelineActivityService {
return [event];
}
private async handleLinkedObjects(event: ObjectRecordBaseEvent) {
private async handleLinkedObjects(
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
) {
const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(
event.workspaceId,
);
@ -92,7 +94,7 @@ export class TimelineActivityService {
}
private async processActivity(
event: ObjectRecordBaseEvent,
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
dataSourceSchema: string,
activityType: string,
) {
@ -145,7 +147,7 @@ export class TimelineActivityService {
}
private async processActivityTarget(
event: ObjectRecordBaseEvent,
event: ObjectRecordBaseEventWithNameAndWorkspaceId,
dataSourceSchema: string,
activityType: string,
) {