Improve webhook (#3459)
* Add trigger record * Merge triggers * Merge creates * Fix libraries * Fix create merged key * Rename file * Remove list Record Ids * Revert "Rename file" This reverts commit 2e72e05793ced4553eec8d9f890d31beae594c85. * Revert "Revert "Rename file"" This reverts commit e2d93fa02716093df6d4d6029af9cc324c06f06b. * Revert "Remove list Record Ids" This reverts commit 6653fb6ccd4307e3958b70923505034d92cf43bb. * Remove namePlural field * Use name singular for webhooks * Send webhook metadata * Extract resource from zapier webhook * Fix package.json * Fix package.json * Update payload * Fix package.json * Update payload * Update payload * Rename file * Use wildcard in webhook events * Fix nameSingular * Code review returns * Code review returns
This commit is contained in:
@ -22,7 +22,7 @@ export enum CallWebhookJobsJobOperation {
|
||||
export type CallWebhookJobsJobData = {
|
||||
workspaceId: string;
|
||||
objectMetadataItem: ObjectMetadataInterface;
|
||||
recordData: any;
|
||||
record: any;
|
||||
operation: CallWebhookJobsJobOperation;
|
||||
};
|
||||
|
||||
@ -41,11 +41,6 @@ export class CallWebhookJobsJob
|
||||
) {}
|
||||
|
||||
async handle(data: CallWebhookJobsJobData): Promise<void> {
|
||||
const objectMetadataItem =
|
||||
await this.objectMetadataService.findOneOrFailWithinWorkspace(
|
||||
data.workspaceId,
|
||||
{ where: { nameSingular: data.objectMetadataItem.nameSingular } },
|
||||
);
|
||||
const dataSourceMetadata =
|
||||
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
|
||||
data.workspaceId,
|
||||
@ -54,27 +49,45 @@ export class CallWebhookJobsJob
|
||||
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
|
||||
data.workspaceId,
|
||||
);
|
||||
const operationName = `${data.operation}.${objectMetadataItem.namePlural}`;
|
||||
const nameSingular = data.objectMetadataItem.nameSingular;
|
||||
const operation = data.operation;
|
||||
const eventType = `${operation}.${nameSingular}`;
|
||||
const webhooks: { id: string; targetUrl: string }[] =
|
||||
await workspaceDataSource?.query(
|
||||
`SELECT * FROM ${dataSourceMetadata.schema}."webhook" WHERE operation='${operationName}'`,
|
||||
`
|
||||
SELECT * FROM ${dataSourceMetadata.schema}."webhook"
|
||||
WHERE operation LIKE '%${eventType}%'
|
||||
OR operation LIKE '%*.${nameSingular}%'
|
||||
OR operation LIKE '%${operation}.*%'
|
||||
OR operation LIKE '%*.*%'
|
||||
`,
|
||||
);
|
||||
|
||||
webhooks.forEach((webhook) => {
|
||||
this.messageQueueService.add<CallWebhookJobData>(
|
||||
CallWebhookJob.name,
|
||||
{
|
||||
recordData: data.recordData,
|
||||
targetUrl: webhook.targetUrl,
|
||||
eventType,
|
||||
objectMetadata: {
|
||||
id: data.objectMetadataItem.id,
|
||||
nameSingular: data.objectMetadataItem.nameSingular,
|
||||
},
|
||||
workspaceId: data.workspaceId,
|
||||
webhookId: webhook.id,
|
||||
eventDate: new Date(),
|
||||
record: data.record,
|
||||
},
|
||||
{ retryLimit: 3 },
|
||||
);
|
||||
});
|
||||
|
||||
this.logger.log(
|
||||
`CallWebhookJobsJob on operation '${operationName}' called on webhooks ids [\n"${webhooks
|
||||
.map((webhook) => webhook.id)
|
||||
.join('",\n"')}"\n]`,
|
||||
);
|
||||
if (webhooks.length) {
|
||||
this.logger.log(
|
||||
`CallWebhookJobsJob on eventType '${eventType}' called on webhooks ids [\n"${webhooks
|
||||
.map((webhook) => webhook.id)
|
||||
.join('",\n"')}"\n]`,
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -5,7 +5,12 @@ import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/messa
|
||||
|
||||
export type CallWebhookJobData = {
|
||||
targetUrl: string;
|
||||
recordData: any;
|
||||
eventType: string;
|
||||
objectMetadata: { id: string; nameSingular: string };
|
||||
workspaceId: string;
|
||||
webhookId: string;
|
||||
eventDate: Date;
|
||||
record: any;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
@ -16,11 +21,9 @@ export class CallWebhookJob implements MessageQueueJob<CallWebhookJobData> {
|
||||
|
||||
async handle(data: CallWebhookJobData): Promise<void> {
|
||||
try {
|
||||
await this.httpService.axiosRef.post(data.targetUrl, data.recordData);
|
||||
await this.httpService.axiosRef.post(data.targetUrl, data);
|
||||
this.logger.log(
|
||||
`CallWebhookJob successfully called on targetUrl '${
|
||||
data.targetUrl
|
||||
}' with data: ${JSON.stringify(data.recordData)}`,
|
||||
`CallWebhookJob successfully called on targetUrl '${data.targetUrl}'`,
|
||||
);
|
||||
} catch (err) {
|
||||
throw new Error(
|
||||
|
||||
@ -388,7 +388,7 @@ export class WorkspaceQueryRunnerService {
|
||||
this.messageQueueService.add<CallWebhookJobsJobData>(
|
||||
CallWebhookJobsJob.name,
|
||||
{
|
||||
recordData: jobData,
|
||||
record: jobData,
|
||||
workspaceId: options.workspaceId,
|
||||
operation,
|
||||
objectMetadataItem: options.objectMetadataItem,
|
||||
|
||||
Reference in New Issue
Block a user