2248 zapier integration implement typeorm eventsubscribers (#3122)

* Add new queue to twenty-server

* Add triggers to zapier

* Rename webhook operation

* Use find one or fail

* Use logger

* Fix typescript templating

* Add dedicated call webhook job

* Update logging

* Fix error handling
This commit is contained in:
martmull
2024-01-03 18:09:57 +01:00
committed by GitHub
parent 4ebaacc306
commit 65250839fb
36 changed files with 1040 additions and 209 deletions

View File

@ -0,0 +1,79 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { ObjectMetadataService } from 'src/metadata/object-metadata/object-metadata.service';
import { DataSourceService } from 'src/metadata/data-source/data-source.service';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import {
CallWebhookJob,
CallWebhookJobData,
} from 'src/workspace/workspace-query-runner/jobs/call-webhook.job';
export enum CallWebhookJobsJobOperation {
create = 'create',
update = 'update',
delete = 'delete',
}
export type CallWebhookJobsJobData = {
workspaceId: string;
objectNameSingular: string;
recordData: any;
operation: CallWebhookJobsJobOperation;
};
@Injectable()
export class CallWebhookJobsJob
implements MessageQueueJob<CallWebhookJobsJobData>
{
private readonly logger = new Logger(CallWebhookJobsJob.name);
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly objectMetadataService: ObjectMetadataService,
private readonly dataSourceService: DataSourceService,
@Inject(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
) {}
async handle(data: CallWebhookJobsJobData): Promise<void> {
const objectMetadataItem =
await this.objectMetadataService.findOneOrFailWithinWorkspace(
data.workspaceId,
{ where: { nameSingular: data.objectNameSingular } },
);
const dataSourceMetadata =
await this.dataSourceService.getLastDataSourceMetadataFromWorkspaceIdOrFail(
data.workspaceId,
);
const workspaceDataSource =
await this.workspaceDataSourceService.connectToWorkspaceDataSource(
data.workspaceId,
);
const operationName = `${data.operation}.${objectMetadataItem.namePlural}`;
const webhooks: { id: string; targetUrl: string }[] =
await workspaceDataSource?.query(
`SELECT * FROM ${dataSourceMetadata.schema}."webhook" WHERE operation='${operationName}'`,
);
webhooks.forEach((webhook) => {
this.messageQueueService.add<CallWebhookJobData>(
CallWebhookJob.name,
{
recordData: data.recordData,
targetUrl: webhook.targetUrl,
},
{ retryLimit: 3 },
);
});
this.logger.log(
`CallWebhookJobsJob on operation '${operationName}' called on webhooks ids [\n"${webhooks
.map((webhook) => webhook.id)
.join('",\n"')}"\n]`,
);
}
}

View File

@ -0,0 +1,31 @@
import { Injectable, Logger } from '@nestjs/common';
import { HttpService } from '@nestjs/axios';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
export type CallWebhookJobData = {
targetUrl: string;
recordData: any;
};
@Injectable()
export class CallWebhookJob implements MessageQueueJob<CallWebhookJobData> {
private readonly logger = new Logger(CallWebhookJob.name);
constructor(private readonly httpService: HttpService) {}
async handle(data: CallWebhookJobData): Promise<void> {
try {
await this.httpService.axiosRef.post(data.targetUrl, data.recordData);
this.logger.log(
`CallWebhookJob successfully called on targetUrl '${
data.targetUrl
}' with data: ${JSON.stringify(data.recordData)}`,
);
} catch (err) {
throw new Error(
`Error calling webhook on targetUrl '${data.targetUrl}': ${err}`,
);
}
}
}

View File

@ -1,5 +1,6 @@
import {
BadRequestException,
Inject,
Injectable,
InternalServerErrorException,
Logger,
@ -24,6 +25,13 @@ import {
import { WorkspaceQueryBuilderFactory } from 'src/workspace/workspace-query-builder/workspace-query-builder.factory';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import {
CallWebhookJobsJob,
CallWebhookJobsJobData,
CallWebhookJobsJobOperation,
} from 'src/workspace/workspace-query-runner/jobs/call-webhook-jobs.job';
import { parseResult } from 'src/workspace/workspace-query-runner/utils/parse-result.util';
import { ExceptionHandlerService } from 'src/integrations/exception-handler/exception-handler.service';
import { globalExceptionHandler } from 'src/filters/utils/global-exception-handler.util';
@ -41,6 +49,8 @@ export class WorkspaceQueryRunnerService {
constructor(
private readonly workspaceQueryBuilderFactory: WorkspaceQueryBuilderFactory,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
@Inject(MessageQueue.webhookQueue)
private readonly messageQueueService: MessageQueueService,
private readonly exceptionHandlerService: ExceptionHandlerService,
) {}
@ -117,11 +127,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);
return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'insertInto',
)?.records;
await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.create,
options,
);
return parsedResults;
} catch (exception) {
const error = globalExceptionHandler(
exception,
@ -136,9 +154,15 @@ export class WorkspaceQueryRunnerService {
args: CreateOneResolverArgs<Record>,
options: WorkspaceQueryRunnerOptions,
): Promise<Record | undefined> {
const records = await this.createMany({ data: [args.data] }, options);
const results = await this.createMany({ data: [args.data] }, options);
return records?.[0];
await this.triggerWebhooks<Record>(
results,
CallWebhookJobsJobOperation.create,
options,
);
return results?.[0];
}
async updateOne<Record extends IRecord = IRecord>(
@ -153,11 +177,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);
return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'update',
)?.records?.[0];
)?.records;
await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.update,
options,
);
return parsedResults?.[0];
} catch (exception) {
const error = globalExceptionHandler(
exception,
@ -180,11 +212,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);
return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'deleteFrom',
)?.records?.[0];
)?.records;
await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.delete,
options,
);
return parsedResults?.[0];
} catch (exception) {
const error = globalExceptionHandler(
exception,
@ -207,11 +247,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);
return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'update',
)?.records;
await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.update,
options,
);
return parsedResults;
} catch (exception) {
const error = globalExceptionHandler(
exception,
@ -237,11 +285,19 @@ export class WorkspaceQueryRunnerService {
);
const result = await this.execute(query, workspaceId);
return this.parseResult<PGGraphQLMutation<Record>>(
const parsedResults = this.parseResult<PGGraphQLMutation<Record>>(
result,
targetTableName,
'deleteFrom',
)?.records;
await this.triggerWebhooks<Record>(
parsedResults,
CallWebhookJobsJobOperation.delete,
options,
);
return parsedResults;
} catch (exception) {
const error = globalExceptionHandler(
exception,
@ -306,4 +362,26 @@ export class WorkspaceQueryRunnerService {
return this.parseResult(result, targetTableName, command);
}
async triggerWebhooks<Record>(
jobsData: Record[] | undefined,
operation: CallWebhookJobsJobOperation,
options: WorkspaceQueryRunnerOptions,
) {
if (!Array.isArray(jobsData)) {
return;
}
jobsData.forEach((jobData) => {
this.messageQueueService.add<CallWebhookJobsJobData>(
CallWebhookJobsJob.name,
{
recordData: jobData,
workspaceId: options.workspaceId,
operation,
objectNameSingular: options.targetTableName,
},
{ retryLimit: 3 },
);
});
}
}