8643 fix sentry error (#8644)

- fixes missing data in event payload when adding a new workspaceMember
- add strong typing to database event emitters
This commit is contained in:
martmull
2024-11-21 17:09:36 +01:00
committed by GitHub
parent 395da91071
commit 39373b4a28
61 changed files with 460 additions and 311 deletions

View File

@ -7,7 +7,7 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { CalendarEventCleanerService } from 'src/modules/calendar/calendar-event-cleaner/services/calendar-event-cleaner.service';
import { CalendarChannelEventAssociationWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-channel-event-association.workspace-entity';

View File

@ -7,7 +7,7 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { CalendarChannelSyncStatusService } from 'src/modules/calendar/common/services/calendar-channel-sync-status.service';
import {

View File

@ -6,7 +6,7 @@ import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/t
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import {
BlocklistItemDeleteCalendarEventsJob,
@ -16,7 +16,7 @@ import {
BlocklistReimportCalendarEventsJob,
BlocklistReimportCalendarEventsJobData,
} from 'src/modules/calendar/blocklist-manager/jobs/blocklist-reimport-calendar-events.job';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -26,7 +26,7 @@ export class CalendarBlocklistListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('blocklist', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('blocklist', DatabaseEventAction.CREATED)
async handleCreatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<BlocklistWorkspaceEntity>
@ -38,7 +38,7 @@ export class CalendarBlocklistListener {
);
}
@OnDatabaseEvent('blocklist', DatabaseEventAction.DELETED)
@OnDatabaseBatchEvent('blocklist', DatabaseEventAction.DELETED)
async handleDeletedEvent(
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<BlocklistWorkspaceEntity>
@ -50,7 +50,7 @@ export class CalendarBlocklistListener {
);
}
@OnDatabaseEvent('blocklist', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('blocklist', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<BlocklistWorkspaceEntity>

View File

@ -4,13 +4,13 @@ import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/t
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
DeleteConnectedAccountAssociatedCalendarDataJob,
DeleteConnectedAccountAssociatedCalendarDataJobData,
} from 'src/modules/calendar/calendar-event-cleaner/jobs/delete-connected-account-associated-calendar-data.job';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -20,7 +20,7 @@ export class CalendarEventCleanerConnectedAccountListener {
private readonly calendarQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('connectedAccount', DatabaseEventAction.DESTROYED)
@OnDatabaseBatchEvent('connectedAccount', DatabaseEventAction.DESTROYED)
async handleDestroyedEvent(
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<ConnectedAccountWorkspaceEntity>

View File

@ -6,7 +6,7 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
CalendarEventParticipantMatchParticipantJob,
CalendarEventParticipantMatchParticipantJobData,
@ -16,7 +16,7 @@ import {
CalendarEventParticipantUnmatchParticipantJobData,
} from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-event-participant-unmatch-participant.job';
import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -26,7 +26,7 @@ export class CalendarEventParticipantPersonListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('person', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('person', DatabaseEventAction.CREATED)
async handleCreatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<PersonWorkspaceEntity>
@ -49,7 +49,7 @@ export class CalendarEventParticipantPersonListener {
}
}
@OnDatabaseEvent('person', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('person', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<PersonWorkspaceEntity>

View File

@ -6,7 +6,7 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
CalendarEventParticipantMatchParticipantJob,
CalendarEventParticipantMatchParticipantJobData,
@ -16,7 +16,7 @@ import {
CalendarEventParticipantUnmatchParticipantJobData,
} from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-event-participant-unmatch-participant.job';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -26,7 +26,7 @@ export class CalendarEventParticipantWorkspaceMemberListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('workspaceMember', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('workspaceMember', DatabaseEventAction.CREATED)
async handleCreatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<WorkspaceMemberWorkspaceEntity>
@ -48,7 +48,7 @@ export class CalendarEventParticipantWorkspaceMemberListener {
}
}
@OnDatabaseEvent('workspaceMember', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('workspaceMember', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<WorkspaceMemberWorkspaceEntity>

View File

@ -1,5 +1,4 @@
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
@ -7,10 +6,11 @@ import { Repository } from 'typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { CalendarEventParticipantWorkspaceEntity } from 'src/modules/calendar/common/standard-objects/calendar-event-participant.workspace-entity';
import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
import { OnCustomBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-custom-batch-event.decorator';
@Injectable()
export class CalendarEventParticipantListener {
@ -22,17 +22,17 @@ export class CalendarEventParticipantListener {
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
) {}
@OnEvent('calendarEventParticipant.matched')
@OnCustomBatchEvent('calendarEventParticipant_matched')
public async handleCalendarEventParticipantMatchedEvent(
payload: WorkspaceEventBatch<{
batchEvent: WorkspaceEventBatch<{
workspaceMemberId: string;
participants: CalendarEventParticipantWorkspaceEntity[];
}>,
): Promise<void> {
const workspaceId = payload.workspaceId;
const workspaceId = batchEvent.workspaceId;
// TODO: Refactor to insertTimelineActivitiesForObject once
for (const eventPayload of payload.events) {
for (const eventPayload of batchEvent.events) {
const calendarEventParticipants = eventPayload.participants;
const workspaceMemberId = eventPayload.workspaceMemberId;

View File

@ -2,11 +2,11 @@ import { Injectable } from '@nestjs/common';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { AccountsToReconnectService } from 'src/modules/connected-account/services/accounts-to-reconnect.service';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -16,7 +16,7 @@ export class ConnectedAccountListener {
private readonly accountsToReconnectService: AccountsToReconnectService,
) {}
@OnDatabaseEvent('connectedAccount', DatabaseEventAction.DESTROYED)
@OnDatabaseBatchEvent('connectedAccount', DatabaseEventAction.DESTROYED)
async handleDestroyedEvent(
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<ConnectedAccountWorkspaceEntity>

View File

@ -1,13 +1,17 @@
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { WorkspaceQueryHookInstance } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/interfaces/workspace-query-hook.interface';
import { DeleteOneResolverArgs } from 'src/engine/api/graphql/workspace-resolver-builder/interfaces/workspace-resolvers-builder.interface';
import { WorkspaceQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/decorators/workspace-query-hook.decorator';
import { AuthContext } from 'src/engine/core-modules/auth/types/auth-context.type';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
@WorkspaceQueryHook(`connectedAccount.destroyOne`)
export class ConnectedAccountDeleteOnePreQueryHook
@ -16,6 +20,8 @@ export class ConnectedAccountDeleteOnePreQueryHook
constructor(
private readonly twentyORMManager: TwentyORMManager,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
@InjectRepository(ObjectMetadataEntity, 'metadata')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
) {}
async execute(
@ -34,19 +40,24 @@ export class ConnectedAccountDeleteOnePreQueryHook
connectedAccountId,
});
this.workspaceEventEmitter.emit(
`messageChannel.${DatabaseEventAction.DESTROYED}`,
messageChannels.map(
(messageChannel) =>
({
recordId: messageChannel.id,
}) satisfies Pick<
ObjectRecordDeleteEvent<MessageChannelWorkspaceEntity>,
'recordId'
>,
),
authContext.workspace.id,
);
const objectMetadata = await this.objectMetadataRepository.findOneOrFail({
where: {
nameSingular: 'messageChannel',
},
});
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'messageChannel',
action: DatabaseEventAction.DESTROYED,
events: messageChannels.map((messageChannel) => ({
recordId: messageChannel.id,
objectMetadata,
properties: {
before: messageChannel,
},
})),
workspaceId: authContext.workspace.id,
});
return payload;
}

View File

@ -1,9 +1,14 @@
import { Module } from '@nestjs/common';
import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
import { ConnectedAccountDeleteOnePreQueryHook } from 'src/modules/connected-account/query-hooks/connected-account-delete-one.pre-query.hook';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
@Module({
imports: [],
imports: [
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
],
providers: [ConnectedAccountDeleteOnePreQueryHook],
})
export class ConnectedAccountQueryHookModule {}

View File

@ -5,13 +5,13 @@ import { objectRecordChangedProperties } from 'src/engine/core-modules/event-emi
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
CalendarCreateCompanyAndContactAfterSyncJob,
CalendarCreateCompanyAndContactAfterSyncJobData,
} from 'src/modules/calendar/calendar-event-participant-manager/jobs/calendar-create-company-and-contact-after-sync.job';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -21,7 +21,7 @@ export class AutoCompaniesAndContactsCreationCalendarChannelListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('calendarChannel', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('calendarChannel', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<MessageChannelWorkspaceEntity>

View File

@ -5,13 +5,13 @@ import { objectRecordChangedProperties } from 'src/engine/core-modules/event-emi
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessagingCreateCompanyAndContactAfterSyncJob,
MessagingCreateCompanyAndContactAfterSyncJobData,
} from 'src/modules/messaging/message-participant-manager/jobs/messaging-create-company-and-contact-after-sync.job';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -21,7 +21,7 @@ export class AutoCompaniesAndContactsCreationMessageChannelListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('messageChannel', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('messageChannel', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<MessageChannelWorkspaceEntity>

View File

@ -5,7 +5,6 @@ import chunk from 'lodash.chunk';
import compact from 'lodash.compact';
import { Any, EntityManager, Repository } from 'typeorm';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { FieldActorSource } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
import { FieldMetadataEntity } from 'src/engine/metadata-modules/field-metadata/field-metadata.entity';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
@ -195,21 +194,19 @@ export class CreateCompanyAndContactService {
source,
);
this.workspaceEventEmitter.emit(
`person.${DatabaseEventAction.CREATED}`,
createdPeople.map(
(createdPerson) =>
({
// FixMe: TypeORM typing issue... id is always returned when using save
recordId: createdPerson.id as string,
objectMetadata,
properties: {
after: createdPerson,
},
}) satisfies ObjectRecordCreateEvent<any>,
),
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'person',
action: DatabaseEventAction.CREATED,
events: createdPeople.map((createdPerson) => ({
// Fix ' as string': TypeORM typing issue... id is always returned when using save
recordId: createdPerson.id as string,
objectMetadata,
properties: {
after: createdPerson,
},
})),
workspaceId,
);
});
}
}
}

View File

@ -1,13 +1,14 @@
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/types/object-record-delete.event';
import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum';
import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { FavoriteFolderWorkspaceEntity } from 'src/modules/favorite-folder/standard-objects/favorite-folder.workspace-entity';
import { FavoriteWorkspaceEntity } from 'src/modules/favorite/standard-objects/favorite.workspace-entity';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
export class FavoriteFolderDeletionListener {
@ -16,7 +17,7 @@ export class FavoriteFolderDeletionListener {
private readonly featureFlagService: FeatureFlagService,
) {}
@OnEvent('favoriteFolder.deleted')
@OnDatabaseBatchEvent('favoriteFolder', DatabaseEventAction.DELETED)
async handleDelete(
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<FavoriteFolderWorkspaceEntity>

View File

@ -119,8 +119,8 @@ export class MatchParticipantService<
transactionManager,
);
this.workspaceEventEmitter.emit(
`${objectMetadataName}.matched`,
this.workspaceEventEmitter.emitCustomBatchEvent(
`${objectMetadataName}_matched`,
[
{
workspaceMemberId: null,
@ -174,12 +174,12 @@ export class MatchParticipantService<
},
});
this.workspaceEventEmitter.emit(
`${objectMetadataName}.matched`,
this.workspaceEventEmitter.emitCustomBatchEvent(
`${objectMetadataName}_matched`,
[
{
workspaceId,
name: `${objectMetadataName}.matched`,
name: `${objectMetadataName}_matched`,
workspaceMemberId: null,
participants: updatedParticipants,
},

View File

@ -7,7 +7,7 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { MessageChannelMessageAssociationWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel-message-association.workspace-entity';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';

View File

@ -7,7 +7,7 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import { MessageChannelSyncStatusService } from 'src/modules/messaging/common/services/message-channel-sync-status.service';
import {

View File

@ -6,7 +6,7 @@ import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/t
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { BlocklistWorkspaceEntity } from 'src/modules/blocklist/standard-objects/blocklist.workspace-entity';
import {
BlocklistItemDeleteMessagesJob,
@ -16,7 +16,7 @@ import {
BlocklistReimportMessagesJob,
BlocklistReimportMessagesJobData,
} from 'src/modules/messaging/blocklist-manager/jobs/messaging-blocklist-reimport-messages.job';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable({ scope: Scope.REQUEST })
@ -26,7 +26,7 @@ export class MessagingBlocklistListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('blocklist', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('blocklist', DatabaseEventAction.CREATED)
async handleCreatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<BlocklistWorkspaceEntity>
@ -38,7 +38,7 @@ export class MessagingBlocklistListener {
);
}
@OnDatabaseEvent('blocklist', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('blocklist', DatabaseEventAction.CREATED)
async handleDeletedEvent(
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<BlocklistWorkspaceEntity>
@ -50,7 +50,7 @@ export class MessagingBlocklistListener {
);
}
@OnDatabaseEvent('blocklist', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('blocklist', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<BlocklistWorkspaceEntity>

View File

@ -4,13 +4,13 @@ import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/t
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { ConnectedAccountWorkspaceEntity } from 'src/modules/connected-account/standard-objects/connected-account.workspace-entity';
import {
MessagingConnectedAccountDeletionCleanupJob,
MessagingConnectedAccountDeletionCleanupJobData,
} from 'src/modules/messaging/message-cleaner/jobs/messaging-connected-account-deletion-cleanup.job';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -20,7 +20,7 @@ export class MessagingMessageCleanerConnectedAccountListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('connectedAccount', DatabaseEventAction.DESTROYED)
@OnDatabaseBatchEvent('connectedAccount', DatabaseEventAction.DESTROYED)
async handleDestroyedEvent(
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<ConnectedAccountWorkspaceEntity>

View File

@ -4,13 +4,13 @@ import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/t
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { MessageChannelWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-channel.workspace-entity';
import {
MessagingCleanCacheJob,
MessagingCleanCacheJobData,
} from 'src/modules/messaging/message-import-manager/jobs/messaging-clean-cache';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -20,7 +20,7 @@ export class MessagingMessageImportManagerMessageChannelListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('messageChannel', DatabaseEventAction.DESTROYED)
@OnDatabaseBatchEvent('messageChannel', DatabaseEventAction.DESTROYED)
async handleDestroyedEvent(
payload: WorkspaceEventBatch<
ObjectRecordDeleteEvent<MessageChannelWorkspaceEntity>

View File

@ -6,7 +6,7 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
MessageParticipantMatchParticipantJob,
MessageParticipantMatchParticipantJobData,
@ -16,7 +16,7 @@ import {
MessageParticipantUnmatchParticipantJobData,
} from 'src/modules/messaging/message-participant-manager/jobs/message-participant-unmatch-participant.job';
import { PersonWorkspaceEntity } from 'src/modules/person/standard-objects/person.workspace-entity';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -26,7 +26,7 @@ export class MessageParticipantPersonListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('person', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('person', DatabaseEventAction.CREATED)
async handleCreatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<PersonWorkspaceEntity>
@ -48,7 +48,7 @@ export class MessageParticipantPersonListener {
}
}
@OnDatabaseEvent('person', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('person', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<PersonWorkspaceEntity>

View File

@ -13,7 +13,7 @@ import { objectRecordChangedProperties as objectRecordUpdateEventChangedProperti
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
MessageParticipantMatchParticipantJob,
MessageParticipantMatchParticipantJobData,
@ -23,7 +23,7 @@ import {
MessageParticipantUnmatchParticipantJobData,
} from 'src/modules/messaging/message-participant-manager/jobs/message-participant-unmatch-participant.job';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -35,7 +35,7 @@ export class MessageParticipantWorkspaceMemberListener {
private readonly workspaceRepository: Repository<Workspace>,
) {}
@OnDatabaseEvent('workspaceMember', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('workspaceMember', DatabaseEventAction.CREATED)
async handleCreatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordCreateEvent<WorkspaceMemberWorkspaceEntity>
@ -68,7 +68,7 @@ export class MessageParticipantWorkspaceMemberListener {
}
}
@OnDatabaseEvent('workspaceMember', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('workspaceMember', DatabaseEventAction.UPDATED)
async handleUpdatedEvent(
payload: WorkspaceEventBatch<
ObjectRecordUpdateEvent<WorkspaceMemberWorkspaceEntity>

View File

@ -1,5 +1,4 @@
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
@ -7,10 +6,11 @@ import { Repository } from 'typeorm';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/workspace-datasource.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { MessageParticipantWorkspaceEntity } from 'src/modules/messaging/common/standard-objects/message-participant.workspace-entity';
import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
import { OnCustomBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-custom-batch-event.decorator';
@Injectable()
export class MessageParticipantListener {
@ -22,28 +22,28 @@ export class MessageParticipantListener {
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
) {}
@OnEvent('messageParticipant.matched')
@OnCustomBatchEvent('messageParticipant_matched')
public async handleMessageParticipantMatched(
payload: WorkspaceEventBatch<{
batchEvent: WorkspaceEventBatch<{
workspaceMemberId: string;
participants: MessageParticipantWorkspaceEntity[];
}>,
): Promise<void> {
// TODO: Refactor to insertTimelineActivitiesForObject once
for (const eventPayload of payload.events) {
for (const eventPayload of batchEvent.events) {
const messageParticipants = eventPayload.participants ?? [];
// TODO: move to a job?
const dataSourceSchema = this.workspaceDataSourceService.getSchemaName(
payload.workspaceId,
batchEvent.workspaceId,
);
const messageObjectMetadata =
await this.objectMetadataRepository.findOneOrFail({
where: {
nameSingular: 'message',
workspaceId: payload.workspaceId,
workspaceId: batchEvent.workspaceId,
},
});
@ -64,12 +64,12 @@ export class MessageParticipantListener {
objectName: 'message',
recordId: participant.personId,
workspaceMemberId: eventPayload.workspaceMemberId,
workspaceId: payload.workspaceId,
workspaceId: batchEvent.workspaceId,
linkedObjectMetadataId: messageObjectMetadata.id,
linkedRecordId: participant.messageId,
linkedRecordCachedName: '',
})),
payload.workspaceId,
batchEvent.workspaceId,
);
}
}

View File

@ -3,7 +3,7 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { AuditLogRepository } from 'src/modules/timeline/repositiories/audit-log.repository';
import { AuditLogWorkspaceEntity } from 'src/modules/timeline/standard-objects/audit-log.workspace-entity';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';

View File

@ -3,10 +3,11 @@ import { Process } from 'src/engine/core-modules/message-queue/decorators/proces
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { InjectObjectMetadataRepository } from 'src/engine/object-metadata-repository/object-metadata-repository.decorator';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { TimelineActivityService } from 'src/modules/timeline/services/timeline-activity.service';
import { WorkspaceMemberRepository } from 'src/modules/workspace-member/repositories/workspace-member.repository';
import { WorkspaceMemberWorkspaceEntity } from 'src/modules/workspace-member/standard-objects/workspace-member.workspace-entity';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
@Processor(MessageQueue.entityEventsToDbQueue)
export class UpsertTimelineActivityFromInternalEvent {
@ -18,7 +19,9 @@ export class UpsertTimelineActivityFromInternalEvent {
@Process(UpsertTimelineActivityFromInternalEvent.name)
async handle(
workspaceEventBatch: WorkspaceEventBatch<ObjectRecordBaseEvent>,
workspaceEventBatch: WorkspaceEventBatch<
ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity>
>,
): Promise<void> {
for (const eventData of workspaceEventBatch.events) {
if (eventData.userId) {

View File

@ -10,7 +10,7 @@ export class AuditLogRepository {
public async insert(
name: string,
properties: string,
properties: object | null,
workspaceMemberId: string | null,
objectName: string,
objectMetadataId: string,

View File

@ -6,13 +6,14 @@ import { WorkspaceDataSourceService } from 'src/engine/workspace-datasource/work
import { TimelineActivityRepository } from 'src/modules/timeline/repositiories/timeline-activity.repository';
import { TimelineActivityWorkspaceEntity } from 'src/modules/timeline/standard-objects/timeline-activity.workspace-entity';
type TimelineActivity = ObjectRecordBaseEvent & {
name: string;
objectName?: string;
linkedRecordCachedName?: string;
linkedRecordId?: string;
linkedObjectMetadataId?: string;
};
type TimelineActivity =
ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity> & {
name: string;
objectName?: string;
linkedRecordCachedName?: string;
linkedRecordId?: string;
linkedObjectMetadataId?: string;
};
@Injectable()
export class TimelineActivityService {
@ -32,7 +33,7 @@ export class TimelineActivityService {
eventName,
workspaceId,
}: {
event: ObjectRecordBaseEvent;
event: ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity>;
eventName: string;
workspaceId: string;
}) {
@ -64,7 +65,7 @@ export class TimelineActivityService {
workspaceId,
eventName,
}: {
event: ObjectRecordBaseEvent;
event: ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity>;
workspaceId: string;
eventName: string;
}): Promise<TimelineActivity[] | undefined> {
@ -100,7 +101,7 @@ export class TimelineActivityService {
workspaceId,
eventName,
}: {
event: ObjectRecordBaseEvent;
event: ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity>;
workspaceId: string;
eventName: string;
}): Promise<TimelineActivity[] | undefined> {
@ -145,7 +146,7 @@ export class TimelineActivityService {
eventName,
workspaceId,
}: {
event: ObjectRecordBaseEvent;
event: ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity>;
dataSourceSchema: string;
activityType: string;
eventName: string;
@ -206,7 +207,7 @@ export class TimelineActivityService {
eventName,
workspaceId,
}: {
event: ObjectRecordBaseEvent;
event: ObjectRecordBaseEvent<TimelineActivityWorkspaceEntity>;
dataSourceSchema: string;
activityType: string;
eventName: string;

View File

@ -10,7 +10,7 @@ import { MessageQueueService } from 'src/engine/core-modules/message-queue/servi
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WebhookWorkspaceEntity } from 'src/modules/webhook/standard-objects/webhook.workspace-entity';
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
CallWebhookJob,
CallWebhookJobData,

View File

@ -7,7 +7,6 @@ import { WorkspaceQueryPostHookInstance } from 'src/engine/api/graphql/workspace
import { WorkspaceQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/decorators/workspace-query-hook.decorator';
import { WorkspaceQueryHookType } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/types/workspace-query-hook.type';
import { AuthContext } from 'src/engine/core-modules/auth/types/auth-context.type';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
@ -62,9 +61,10 @@ export class WorkflowCreateManyPostQueryHook
},
});
this.workspaceEventEmitter.emit(
`workflowVersion.${DatabaseEventAction.CREATED}`,
workflowVersionsToCreate.map((workflowVersionToCreate) => {
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'workflowVersion',
action: DatabaseEventAction.CREATED,
events: workflowVersionsToCreate.map((workflowVersionToCreate) => {
return {
userId: authContext.user?.id,
recordId: workflowVersionToCreate.id,
@ -72,9 +72,9 @@ export class WorkflowCreateManyPostQueryHook
properties: {
after: workflowVersionToCreate,
},
} satisfies ObjectRecordCreateEvent<any>;
};
}),
authContext.workspace.id,
);
workspaceId: authContext.workspace.id,
});
}
}

View File

@ -7,7 +7,6 @@ import { WorkspaceQueryPostHookInstance } from 'src/engine/api/graphql/workspace
import { WorkspaceQueryHook } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/decorators/workspace-query-hook.decorator';
import { WorkspaceQueryHookType } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-hook/types/workspace-query-hook.type';
import { AuthContext } from 'src/engine/core-modules/auth/types/auth-context.type';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
import { TwentyORMManager } from 'src/engine/twenty-orm/twenty-orm.manager';
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
@ -58,9 +57,10 @@ export class WorkflowCreateOnePostQueryHook
},
});
this.workspaceEventEmitter.emit(
`workflowVersion.${DatabaseEventAction.CREATED}`,
[
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'workflowVersion',
action: DatabaseEventAction.CREATED,
events: [
{
userId: authContext.user?.id,
recordId: workflowVersionToCreate.id,
@ -68,9 +68,9 @@ export class WorkflowCreateOnePostQueryHook
properties: {
after: workflowVersionToCreate,
},
} satisfies ObjectRecordCreateEvent<any>,
},
],
authContext.workspace.id,
);
workspaceId: authContext.workspace.id,
});
}
}

View File

@ -0,0 +1,2 @@
export const WORKFLOW_VERSION_STATUS_UPDATED =
'workflow_version_status_updated';

View File

@ -5,7 +5,7 @@ import { ObjectRecordDeleteEvent } from 'src/engine/core-modules/event-emitter/t
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import {
WorkflowVersionStatus,
WorkflowVersionWorkspaceEntity,
@ -16,8 +16,10 @@ import {
WorkflowVersionEventType,
WorkflowVersionStatusUpdate,
} from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants';
import { OnCustomBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-custom-batch-event.decorator';
@Injectable()
export class WorkflowVersionStatusListener {
@ -26,13 +28,13 @@ export class WorkflowVersionStatusListener {
private readonly messageQueueService: MessageQueueService,
) {}
@OnDatabaseEvent('workflowVersion', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('workflowVersion', DatabaseEventAction.CREATED)
async handleWorkflowVersionCreated(
payload: WorkspaceEventBatch<
batchEvent: WorkspaceEventBatch<
ObjectRecordCreateEvent<WorkflowVersionWorkspaceEntity>
>,
): Promise<void> {
const workflowIds = payload.events
const workflowIds = batchEvent.events
.filter(
(event) =>
!event.properties.after.status ||
@ -48,33 +50,33 @@ export class WorkflowVersionStatusListener {
WorkflowStatusesUpdateJob.name,
{
type: WorkflowVersionEventType.CREATE,
workspaceId: payload.workspaceId,
workspaceId: batchEvent.workspaceId,
workflowIds,
},
);
}
@OnDatabaseEvent('workflowVersion', DatabaseEventAction.UPDATED)
@OnCustomBatchEvent(WORKFLOW_VERSION_STATUS_UPDATED)
async handleWorkflowVersionUpdated(
payload: WorkspaceEventBatch<WorkflowVersionStatusUpdate>,
batchEvent: WorkspaceEventBatch<WorkflowVersionStatusUpdate>,
): Promise<void> {
await this.messageQueueService.add<WorkflowVersionBatchEvent>(
WorkflowStatusesUpdateJob.name,
{
type: WorkflowVersionEventType.STATUS_UPDATE,
workspaceId: payload.workspaceId,
statusUpdates: payload.events,
workspaceId: batchEvent.workspaceId,
statusUpdates: batchEvent.events,
},
);
}
@OnDatabaseEvent('workflowVersion', DatabaseEventAction.DELETED)
@OnDatabaseBatchEvent('workflowVersion', DatabaseEventAction.DELETED)
async handleWorkflowVersionDeleted(
payload: WorkspaceEventBatch<
batchEvent: WorkspaceEventBatch<
ObjectRecordDeleteEvent<WorkflowVersionWorkspaceEntity>
>,
): Promise<void> {
const workflowIds = payload.events
const workflowIds = batchEvent.events
.filter(
(event) =>
event.properties.before.status === WorkflowVersionStatus.DRAFT,
@ -89,7 +91,7 @@ export class WorkflowVersionStatusListener {
WorkflowStatusesUpdateJob.name,
{
type: WorkflowVersionEventType.DELETE,
workspaceId: payload.workspaceId,
workspaceId: batchEvent.workspaceId,
workflowIds,
},
);

View File

@ -10,13 +10,13 @@ import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decora
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
import { WorkflowEventListenerWorkspaceEntity } from 'src/modules/workflow/common/standard-objects/workflow-event-listener.workspace-entity';
import {
WorkflowEventTriggerJob,
WorkflowEventTriggerJobData,
} from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { OnDatabaseBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-batch-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
@ -30,40 +30,40 @@ export class DatabaseEventTriggerListener {
private readonly isFeatureFlagEnabledService: FeatureFlagService,
) {}
@OnDatabaseEvent('*', DatabaseEventAction.CREATED)
@OnDatabaseBatchEvent('*', DatabaseEventAction.CREATED)
async handleObjectRecordCreateEvent(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
payload: WorkspaceEventBatch<ObjectRecordCreateEvent>,
) {
await this.handleEvent(payload);
}
@OnDatabaseEvent('*', DatabaseEventAction.UPDATED)
@OnDatabaseBatchEvent('*', DatabaseEventAction.UPDATED)
async handleObjectRecordUpdateEvent(
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent<any>>,
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent>,
) {
await this.handleEvent(payload);
}
@OnDatabaseEvent('*', DatabaseEventAction.DELETED)
@OnDatabaseBatchEvent('*', DatabaseEventAction.DELETED)
async handleObjectRecordDeleteEvent(
payload: WorkspaceEventBatch<ObjectRecordDeleteEvent<any>>,
payload: WorkspaceEventBatch<ObjectRecordDeleteEvent>,
) {
await this.handleEvent(payload);
}
@OnDatabaseEvent('*', DatabaseEventAction.DESTROYED)
@OnDatabaseBatchEvent('*', DatabaseEventAction.DESTROYED)
async handleObjectRecordDestroyEvent(
payload: WorkspaceEventBatch<ObjectRecordDestroyEvent<any>>,
payload: WorkspaceEventBatch<ObjectRecordDestroyEvent>,
) {
await this.handleEvent(payload);
}
private async handleEvent(
payload: WorkspaceEventBatch<
| ObjectRecordCreateEvent<any>
| ObjectRecordUpdateEvent<any>
| ObjectRecordDeleteEvent<any>
| ObjectRecordDestroyEvent<any>
| ObjectRecordCreateEvent
| ObjectRecordUpdateEvent
| ObjectRecordDeleteEvent
| ObjectRecordDestroyEvent
>,
) {
const workspaceId = payload.workspaceId;

View File

@ -1,17 +1,21 @@
import { Module } from '@nestjs/common';
import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
import { WorkflowRunnerModule } from 'src/modules/workflow/workflow-runner/workflow-runner.module';
import { DatabaseEventTriggerModule } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.module';
import { WorkflowEventTriggerJob } from 'src/modules/workflow/workflow-trigger/jobs/workflow-event-trigger.job';
import { WorkflowTriggerWorkspaceService } from 'src/modules/workflow/workflow-trigger/workspace-services/workflow-trigger.workspace-service';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
@Module({
imports: [
WorkflowCommonModule,
WorkflowRunnerModule,
DatabaseEventTriggerModule,
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
],
providers: [
WorkflowTriggerWorkspaceService,

View File

@ -1,6 +1,7 @@
import { Injectable } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { EntityManager } from 'typeorm';
import { EntityManager, Repository } from 'typeorm';
import { buildCreatedByFromWorkspaceMember } from 'src/engine/core-modules/actor/utils/build-created-by-from-workspace-member.util';
import { User } from 'src/engine/core-modules/user/user.entity';
@ -16,7 +17,6 @@ import { WorkflowWorkspaceEntity } from 'src/modules/workflow/common/standard-ob
import { assertWorkflowVersionTriggerIsDefined } from 'src/modules/workflow/common/utils/assert-workflow-version-trigger-is-defined.util';
import { WorkflowCommonWorkspaceService } from 'src/modules/workflow/common/workspace-services/workflow-common.workspace-service';
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
import { WorkflowVersionStatusUpdate } from 'src/modules/workflow/workflow-status/jobs/workflow-statuses-update.job';
import { DatabaseEventTriggerService } from 'src/modules/workflow/workflow-trigger/database-event-trigger/database-event-trigger.service';
import {
WorkflowTriggerException,
@ -26,6 +26,8 @@ import { WorkflowTriggerType } from 'src/modules/workflow/workflow-trigger/types
import { assertVersionCanBeActivated } from 'src/modules/workflow/workflow-trigger/utils/assert-version-can-be-activated.util';
import { assertNever } from 'src/utils/assert';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
import { WORKFLOW_VERSION_STATUS_UPDATED } from 'src/modules/workflow/workflow-status/constants/workflow-version-status-updated.constants';
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
@Injectable()
export class WorkflowTriggerWorkspaceService {
@ -36,14 +38,11 @@ export class WorkflowTriggerWorkspaceService {
private readonly workflowRunnerWorkspaceService: WorkflowRunnerWorkspaceService,
private readonly databaseEventTriggerService: DatabaseEventTriggerService,
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
@InjectRepository(ObjectMetadataEntity, 'metadata')
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
) {}
async runWorkflowVersion(
workflowVersionId: string,
payload: object,
workspaceMemberId: string,
user: User,
) {
private getWorkspaceId() {
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
if (!workspaceId) {
@ -53,12 +52,21 @@ export class WorkflowTriggerWorkspaceService {
);
}
return workspaceId;
}
async runWorkflowVersion(
workflowVersionId: string,
payload: object,
workspaceMemberId: string,
user: User,
) {
await this.workflowCommonWorkspaceService.getWorkflowVersionOrFail(
workflowVersionId,
);
return await this.workflowRunnerWorkspaceService.run(
workspaceId,
this.getWorkspaceId(),
workflowVersionId,
payload,
buildCreatedByFromWorkspaceMember(workspaceMemberId, user),
@ -246,10 +254,10 @@ export class WorkflowTriggerWorkspaceService {
manager,
);
this.emitStatusUpdateEventOrThrow(
workflowVersion.workflowId,
workflowVersion.status,
await this.emitStatusUpdateEvents(
workflowVersion,
WorkflowVersionStatus.ACTIVE,
this.getWorkspaceId(),
);
}
@ -271,10 +279,10 @@ export class WorkflowTriggerWorkspaceService {
manager,
);
this.emitStatusUpdateEventOrThrow(
workflowVersion.workflowId,
workflowVersion.status,
await this.emitStatusUpdateEvents(
workflowVersion,
WorkflowVersionStatus.DEACTIVATED,
this.getWorkspaceId(),
);
}
@ -348,28 +356,45 @@ export class WorkflowTriggerWorkspaceService {
}
}
private emitStatusUpdateEventOrThrow(
workflowId: string,
previousStatus: WorkflowVersionStatus,
private async emitStatusUpdateEvents(
workflowVersion: WorkflowVersionWorkspaceEntity,
newStatus: WorkflowVersionStatus,
workspaceId: string,
) {
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
const objectMetadata = await this.objectMetadataRepository.findOneOrFail({
where: {
nameSingular: 'workflowVersion',
},
});
if (!workspaceId) {
throw new WorkflowTriggerException(
'No workspace id found',
WorkflowTriggerExceptionCode.INTERNAL_ERROR,
);
}
this.workspaceEventEmitter.emitDatabaseBatchEvent({
objectMetadataNameSingular: 'workflowVersion',
action: DatabaseEventAction.UPDATED,
events: [
{
recordId: workflowVersion.id,
objectMetadata,
properties: {
before: workflowVersion,
after: { ...workflowVersion, status: newStatus },
updatedFields: ['status'],
diff: {
status: { before: workflowVersion.status, after: newStatus },
},
},
},
],
workspaceId,
});
this.workspaceEventEmitter.emit(
`workflowVersion.${DatabaseEventAction.UPDATED}`,
this.workspaceEventEmitter.emitCustomBatchEvent(
WORKFLOW_VERSION_STATUS_UPDATED,
[
{
workflowId,
previousStatus,
workflowId: workflowVersion.workflowId,
previousStatus: workflowVersion.status,
newStatus,
} satisfies WorkflowVersionStatusUpdate,
},
],
workspaceId,
);