6071 return only updated fields of records in zapier update trigger (#8193)

- move webhook triggers into `entity-events-to-db.listener.ts`
- refactor event management
- add a `@OnDatabaseEvent` decorator to manage database events
- add updatedFields in updated events
- update openApi webhooks docs
- update zapier integration
This commit is contained in:
martmull
2024-11-04 17:44:36 +01:00
committed by GitHub
parent 741020fbb0
commit 695991881f
62 changed files with 547 additions and 578 deletions

View File

@ -1,91 +0,0 @@
import { Logger } from '@nestjs/common';
import { ArrayContains } from 'typeorm';
import { ObjectMetadataInterface } from 'src/engine/metadata-modules/field-metadata/interfaces/object-metadata.interface';
import {
CallWebhookJob,
CallWebhookJobData,
} from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { TwentyORMGlobalManager } from 'src/engine/twenty-orm/twenty-orm-global.manager';
import { WebhookWorkspaceEntity } from 'src/modules/webhook/standard-objects/webhook.workspace-entity';
export enum CallWebhookJobsJobOperation {
create = 'create',
update = 'update',
delete = 'delete',
destroy = 'destroy',
}
export type CallWebhookJobsJobData = {
workspaceId: string;
objectMetadataItem: ObjectMetadataInterface;
record: any;
operation: CallWebhookJobsJobOperation;
};
@Processor(MessageQueue.webhookQueue)
export class CallWebhookJobsJob {
private readonly logger = new Logger(CallWebhookJobsJob.name);
constructor(
@InjectMessageQueue(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
private readonly twentyORMGlobalManager: TwentyORMGlobalManager,
) {}
@Process(CallWebhookJobsJob.name)
async handle(data: CallWebhookJobsJobData): Promise<void> {
// If you change that function, double check it does not break Zapier
// trigger in packages/twenty-zapier/src/triggers/trigger_record.ts
const webhookRepository =
await this.twentyORMGlobalManager.getRepositoryForWorkspace<WebhookWorkspaceEntity>(
data.workspaceId,
'webhook',
);
const nameSingular = data.objectMetadataItem.nameSingular;
const operation = data.operation;
const eventName = `${nameSingular}.${operation}`;
const webhooks = await webhookRepository.find({
where: [
{ operations: ArrayContains([eventName]) },
{ operations: ArrayContains([`*.${operation}`]) },
{ operations: ArrayContains([`${nameSingular}.*`]) },
{ operations: ArrayContains(['*.*']) },
],
});
webhooks.forEach((webhook) => {
this.messageQueueService.add<CallWebhookJobData>(
CallWebhookJob.name,
{
targetUrl: webhook.targetUrl,
eventName,
objectMetadata: {
id: data.objectMetadataItem.id,
nameSingular: data.objectMetadataItem.nameSingular,
},
workspaceId: data.workspaceId,
webhookId: webhook.id,
eventDate: new Date(),
record: data.record,
},
{ retryLimit: 3 },
);
});
webhooks.length > 0 &&
this.logger.log(
`CallWebhookJobsJob on eventName '${eventName}' triggered webhooks with ids [\n"${webhooks.map((webhook) => webhook.id).join('",\n"')}"\n]`,
);
}
}

View File

@ -1,67 +0,0 @@
import { HttpService } from '@nestjs/axios';
import { Logger } from '@nestjs/common';
import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.service';
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
export type CallWebhookJobData = {
targetUrl: string;
eventName: string;
objectMetadata: { id: string; nameSingular: string };
workspaceId: string;
webhookId: string;
eventDate: Date;
record: any;
};
@Processor(MessageQueue.webhookQueue)
export class CallWebhookJob {
private readonly logger = new Logger(CallWebhookJob.name);
constructor(
private readonly httpService: HttpService,
private readonly analyticsService: AnalyticsService,
) {}
@Process(CallWebhookJob.name)
async handle(data: CallWebhookJobData): Promise<void> {
const commonPayload = {
url: data.targetUrl,
webhookId: data.webhookId,
eventName: data.eventName,
};
try {
const response = await this.httpService.axiosRef.post(
data.targetUrl,
data,
);
const success = response.status >= 200 && response.status < 300;
const eventInput = {
action: 'webhook.response',
payload: {
status: response.status,
success,
...commonPayload,
},
};
this.analyticsService.create(eventInput, 'webhook', data.workspaceId);
} catch (err) {
const eventInput = {
action: 'webhook.response',
payload: {
success: false,
...commonPayload,
...(err.response && { status: err.response.status }),
},
};
this.analyticsService.create(eventInput, 'webhook', data.workspaceId);
this.logger.error(
`Error calling webhook on targetUrl '${data.targetUrl}': ${err}`,
);
}
}
}

View File

@ -1,8 +1,6 @@
import { HttpModule } from '@nestjs/axios';
import { Module } from '@nestjs/common';
import { CallWebhookJobsJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook-jobs.job';
import { CallWebhookJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/call-webhook.job';
import { RecordPositionBackfillJob } from 'src/engine/api/graphql/workspace-query-runner/jobs/record-position-backfill.job';
import { RecordPositionBackfillModule } from 'src/engine/api/graphql/workspace-query-runner/services/record-position-backfill-module';
import { AnalyticsModule } from 'src/engine/core-modules/analytics/analytics.module';
@ -14,9 +12,7 @@ import { WorkspaceDataSourceModule } from 'src/engine/workspace-datasource/works
WorkspaceDataSourceModule,
DataSourceModule,
RecordPositionBackfillModule,
HttpModule,
AnalyticsModule,
],
providers: [CallWebhookJobsJob, CallWebhookJob, RecordPositionBackfillJob],
providers: [RecordPositionBackfillJob],
})
export class WorkspaceQueryRunnerJobModule {}

View File

@ -1,55 +1,49 @@
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { ObjectRecordUpdateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-update.event';
import { ObjectRecordBaseEvent } from 'src/engine/core-modules/event-emitter/types/object-record.base.event';
import { objectRecordChangedValues } from 'src/engine/core-modules/event-emitter/utils/object-record-changed-values';
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { CallWebhookJobsJob } from 'src/modules/webhook/jobs/call-webhook-jobs.job';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
export class EntityEventsToDbListener {
constructor(
@InjectMessageQueue(MessageQueue.entityEventsToDbQueue)
private readonly messageQueueService: MessageQueueService,
private readonly entityEventsToDbQueueService: MessageQueueService,
@InjectMessageQueue(MessageQueue.webhookQueue)
private readonly webhookQueueService: MessageQueueService,
) {}
@OnEvent('*.created')
@OnDatabaseEvent('*', DatabaseEventAction.CREATED)
async handleCreate(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {
return this.handle(payload);
}
@OnEvent('*.updated')
@OnDatabaseEvent('*', DatabaseEventAction.UPDATED)
async handleUpdate(
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent<any>>,
) {
for (const eventPayload of payload.events) {
eventPayload.properties.diff = objectRecordChangedValues(
eventPayload.properties.before,
eventPayload.properties.after,
eventPayload.properties.updatedFields,
eventPayload.objectMetadata,
);
}
return this.handle(payload);
}
@OnEvent('*.deleted')
@OnDatabaseEvent('*', DatabaseEventAction.DELETED)
async handleDelete(
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent<any>>,
) {
return this.handle(payload);
}
@OnEvent('*.destroyed')
@OnDatabaseEvent('*', DatabaseEventAction.DESTROYED)
async handleDestroy(
payload: WorkspaceEventBatch<ObjectRecordUpdateEvent<any>>,
) {
@ -61,18 +55,22 @@ export class EntityEventsToDbListener {
(event) => event.objectMetadata?.isAuditLogged,
);
await this.messageQueueService.add<
await this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(CreateAuditLogFromInternalEvent.name, {
...payload,
events: filteredEvents,
});
await this.messageQueueService.add<
await this.entityEventsToDbQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(UpsertTimelineActivityFromInternalEvent.name, {
...payload,
events: filteredEvents,
});
await this.webhookQueueService.add<
WorkspaceEventBatch<ObjectRecordBaseEvent>
>(CallWebhookJobsJob.name, payload, { retryLimit: 3 });
}
}

View File

@ -5,6 +5,8 @@ import { AnalyticsService } from 'src/engine/core-modules/analytics/analytics.se
import { ObjectRecordCreateEvent } from 'src/engine/core-modules/event-emitter/types/object-record-create.event';
import { TelemetryService } from 'src/engine/core-modules/telemetry/telemetry.service';
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/workspace-event.type';
import { OnDatabaseEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-database-event.decorator';
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
@Injectable()
export class TelemetryListener {
@ -13,7 +15,7 @@ export class TelemetryListener {
private readonly telemetryService: TelemetryService,
) {}
@OnEvent('*.created')
@OnDatabaseEvent('*', DatabaseEventAction.CREATED)
async handleAllCreate(
payload: WorkspaceEventBatch<ObjectRecordCreateEvent<any>>,
) {