Ws poc (#11293)
related to https://github.com/twentyhq/core-team-issues/issues/601 ## Done - add a `onDbEvent` `Subscription` graphql endpoint to listen to database_event using what we have done with webhooks: - you can subscribe to any `action` (created, updated, ...) for any `objectNameSingular` or a specific `recordId`. Parameters are nullable and treated as wildcards when null. - returns events with following shape ```typescript @Field(() => String) eventId: string; @Field() emittedAt: string; @Field(() => DatabaseEventAction) action: DatabaseEventAction; @Field(() => String) objectNameSingular: string; @Field(() => GraphQLJSON) record: ObjectRecord; @Field(() => [String], { nullable: true }) updatedFields?: string[]; ``` - front provide a componentEffect `<ListenRecordUpdatesEffect />` that listen for an `objectNameSingular`, a `recordId` and a list of `listenedFields`. It subscribes to record updates and updates its apollo cached value for specified `listenedFields` - subscription is protected with credentials ## Result Here is an application with `workflowRun` https://github.com/user-attachments/assets/c964d857-3b54-495f-bf14-587ba26c5a8c --------- Co-authored-by: prastoin <paul@twenty.com>
This commit is contained in:
@ -16,6 +16,7 @@ import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/wo
|
||||
import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event';
|
||||
import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job';
|
||||
import { CallWebhookJobsJob } from 'src/modules/webhook/jobs/call-webhook-jobs.job';
|
||||
import { SubscriptionsJob } from 'src/engine/subscriptions/subscriptions.job';
|
||||
|
||||
@Injectable()
|
||||
export class EntityEventsToDbListener {
|
||||
@ -24,6 +25,8 @@ export class EntityEventsToDbListener {
|
||||
private readonly entityEventsToDbQueueService: MessageQueueService,
|
||||
@InjectMessageQueue(MessageQueue.webhookQueue)
|
||||
private readonly webhookQueueService: MessageQueueService,
|
||||
@InjectMessageQueue(MessageQueue.subscriptionsQueue)
|
||||
private readonly subscriptionsQueueService: MessageQueueService,
|
||||
) {}
|
||||
|
||||
@OnDatabaseBatchEvent('*', DatabaseEventAction.CREATED)
|
||||
@ -64,6 +67,11 @@ export class EntityEventsToDbListener {
|
||||
);
|
||||
|
||||
await Promise.all([
|
||||
this.subscriptionsQueueService.add<WorkspaceEventBatch<T>>(
|
||||
SubscriptionsJob.name,
|
||||
batchEvent,
|
||||
{ retryLimit: 3 },
|
||||
),
|
||||
this.webhookQueueService.add<WorkspaceEventBatch<T>>(
|
||||
CallWebhookJobsJob.name,
|
||||
batchEvent,
|
||||
|
||||
@ -1,7 +1,6 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { GraphqlQueryRunnerModule } from 'src/engine/api/graphql/graphql-query-runner/graphql-query-runner.module';
|
||||
import { WorkspaceQueryRunnerModule } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module';
|
||||
import { WorkspaceResolverBuilderService } from 'src/engine/api/graphql/workspace-resolver-builder/workspace-resolver-builder.service';
|
||||
import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module';
|
||||
|
||||
@ -10,11 +9,7 @@ import { WorkspaceResolverFactory } from './workspace-resolver.factory';
|
||||
import { workspaceResolverBuilderFactories } from './factories/factories';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
WorkspaceQueryRunnerModule,
|
||||
GraphqlQueryRunnerModule,
|
||||
FeatureFlagModule,
|
||||
],
|
||||
imports: [GraphqlQueryRunnerModule, FeatureFlagModule],
|
||||
providers: [
|
||||
...workspaceResolverBuilderFactories,
|
||||
WorkspaceResolverFactory,
|
||||
|
||||
@ -48,6 +48,8 @@ import { WorkspaceInvitationModule } from 'src/engine/core-modules/workspace-inv
|
||||
import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module';
|
||||
import { RoleModule } from 'src/engine/metadata-modules/role/role.module';
|
||||
import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module';
|
||||
import { WorkspaceQueryRunnerModule } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module';
|
||||
import { SubscriptionsModule } from 'src/engine/subscriptions/subscriptions.module';
|
||||
|
||||
import { AnalyticsModule } from './analytics/analytics.module';
|
||||
import { ClientConfigModule } from './client-config/client-config.module';
|
||||
@ -81,6 +83,8 @@ import { FileModule } from './file/file.module';
|
||||
RoleModule,
|
||||
TwentyConfigModule,
|
||||
RedisClientModule,
|
||||
WorkspaceQueryRunnerModule,
|
||||
SubscriptionsModule,
|
||||
FileStorageModule.forRootAsync({
|
||||
useFactory: fileStorageModuleFactory,
|
||||
inject: [TwentyConfigService],
|
||||
|
||||
@ -32,6 +32,7 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module
|
||||
import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module';
|
||||
import { WebhookJobModule } from 'src/modules/webhook/jobs/webhook-job.module';
|
||||
import { WorkflowModule } from 'src/modules/workflow/workflow.module';
|
||||
import { SubscriptionsModule } from 'src/engine/subscriptions/subscriptions.module';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
@ -58,6 +59,7 @@ import { WorkflowModule } from 'src/modules/workflow/workflow.module';
|
||||
WorkflowModule,
|
||||
FavoriteModule,
|
||||
WorkspaceCleanerModule,
|
||||
SubscriptionsModule,
|
||||
],
|
||||
providers: [
|
||||
CleanSuspendedWorkspacesJob,
|
||||
|
||||
@ -19,4 +19,5 @@ export enum MessageQueue {
|
||||
workflowQueue = 'workflow-queue',
|
||||
serverlessFunctionQueue = 'serverless-function-queue',
|
||||
deleteCascadeQueue = 'delete-cascade-queue',
|
||||
subscriptionsQueue = 'subscriptions-queue',
|
||||
}
|
||||
|
||||
@ -0,0 +1,30 @@
|
||||
import { Field, ObjectType, registerEnumType } from '@nestjs/graphql';
|
||||
|
||||
import GraphQLJSON from 'graphql-type-json';
|
||||
|
||||
import { ObjectRecord } from 'src/engine/api/graphql/workspace-query-builder/interfaces/object-record.interface';
|
||||
|
||||
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
|
||||
|
||||
registerEnumType(DatabaseEventAction, {
|
||||
name: 'DatabaseEventAction',
|
||||
description: 'Database Event Action',
|
||||
});
|
||||
|
||||
@ObjectType()
|
||||
export class OnDbEventDTO {
|
||||
@Field(() => DatabaseEventAction)
|
||||
action: DatabaseEventAction;
|
||||
|
||||
@Field(() => String)
|
||||
objectNameSingular: string;
|
||||
|
||||
@Field()
|
||||
eventDate: Date;
|
||||
|
||||
@Field(() => GraphQLJSON)
|
||||
record: ObjectRecord;
|
||||
|
||||
@Field(() => [String], { nullable: true })
|
||||
updatedFields?: string[];
|
||||
}
|
||||
@ -0,0 +1,15 @@
|
||||
import { Field, InputType } from '@nestjs/graphql';
|
||||
|
||||
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
|
||||
|
||||
@InputType()
|
||||
export class OnDbEventInput {
|
||||
@Field(() => DatabaseEventAction, { nullable: true })
|
||||
action?: DatabaseEventAction;
|
||||
|
||||
@Field(() => String, { nullable: true })
|
||||
objectNameSingular?: string;
|
||||
|
||||
@Field(() => String, { nullable: true })
|
||||
recordId?: string;
|
||||
}
|
||||
@ -0,0 +1,52 @@
|
||||
import { Inject } from '@nestjs/common';
|
||||
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
import { RedisPubSub } from 'graphql-redis-subscriptions';
|
||||
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator';
|
||||
import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator';
|
||||
import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type';
|
||||
import { ObjectRecordEvent } from 'src/engine/core-modules/event-emitter/types/object-record-event.event';
|
||||
import { removeSecretFromWebhookRecord } from 'src/utils/remove-secret-from-webhook-record';
|
||||
|
||||
@Processor(MessageQueue.subscriptionsQueue)
|
||||
export class SubscriptionsJob {
|
||||
constructor(@Inject('PUB_SUB') private readonly pubSub: RedisPubSub) {}
|
||||
|
||||
@Process(SubscriptionsJob.name)
|
||||
async handle(
|
||||
workspaceEventBatch: WorkspaceEventBatch<ObjectRecordEvent>,
|
||||
): Promise<void> {
|
||||
for (const eventData of workspaceEventBatch.events) {
|
||||
const [nameSingular, operation] = workspaceEventBatch.name.split('.');
|
||||
const record =
|
||||
'after' in eventData.properties && isDefined(eventData.properties.after)
|
||||
? eventData.properties.after
|
||||
: 'before' in eventData.properties &&
|
||||
isDefined(eventData.properties.before)
|
||||
? eventData.properties.before
|
||||
: {};
|
||||
const updatedFields =
|
||||
'updatedFields' in eventData.properties
|
||||
? eventData.properties.updatedFields
|
||||
: undefined;
|
||||
|
||||
const isWebhookEvent = nameSingular === 'webhook';
|
||||
const sanitizedRecord = removeSecretFromWebhookRecord(
|
||||
record,
|
||||
isWebhookEvent,
|
||||
);
|
||||
|
||||
await this.pubSub.publish('onDbEvent', {
|
||||
onDbEvent: {
|
||||
action: operation,
|
||||
objectNameSingular: nameSingular,
|
||||
eventDate: new Date(),
|
||||
record: sanitizedRecord,
|
||||
...(updatedFields && { updatedFields }),
|
||||
},
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,34 @@
|
||||
import { Inject, Module, OnModuleDestroy } from '@nestjs/common';
|
||||
|
||||
import { RedisPubSub } from 'graphql-redis-subscriptions';
|
||||
|
||||
import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service';
|
||||
import { SubscriptionsResolver } from 'src/engine/subscriptions/subscriptions.resolver';
|
||||
import { SubscriptionsJob } from 'src/engine/subscriptions/subscriptions.job';
|
||||
|
||||
@Module({
|
||||
exports: ['PUB_SUB'],
|
||||
providers: [
|
||||
{
|
||||
provide: 'PUB_SUB',
|
||||
inject: [RedisClientService],
|
||||
|
||||
useFactory: (redisClientService: RedisClientService) =>
|
||||
new RedisPubSub({
|
||||
publisher: redisClientService.getClient().duplicate(),
|
||||
subscriber: redisClientService.getClient().duplicate(),
|
||||
}),
|
||||
},
|
||||
SubscriptionsResolver,
|
||||
SubscriptionsJob,
|
||||
],
|
||||
})
|
||||
export class SubscriptionsModule implements OnModuleDestroy {
|
||||
constructor(@Inject('PUB_SUB') private readonly pubSub: RedisPubSub) {}
|
||||
|
||||
async onModuleDestroy() {
|
||||
if (this.pubSub) {
|
||||
await this.pubSub.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -0,0 +1,43 @@
|
||||
import { Args, Resolver, Subscription } from '@nestjs/graphql';
|
||||
import { Inject, UseGuards } from '@nestjs/common';
|
||||
|
||||
import { RedisPubSub } from 'graphql-redis-subscriptions';
|
||||
import { isDefined } from 'twenty-shared/utils';
|
||||
|
||||
import { OnDbEventDTO } from 'src/engine/subscriptions/dtos/on-db-event.dto';
|
||||
import { OnDbEventInput } from 'src/engine/subscriptions/dtos/on-db-event.input';
|
||||
import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard';
|
||||
import { UserAuthGuard } from 'src/engine/guards/user-auth.guard';
|
||||
|
||||
@Resolver()
|
||||
@UseGuards(WorkspaceAuthGuard, UserAuthGuard)
|
||||
export class SubscriptionsResolver {
|
||||
constructor(@Inject('PUB_SUB') private readonly pubSub: RedisPubSub) {}
|
||||
|
||||
@Subscription(() => OnDbEventDTO, {
|
||||
filter: (
|
||||
payload: { onDbEvent: OnDbEventDTO },
|
||||
variables: { input: OnDbEventInput },
|
||||
) => {
|
||||
const isActionMatching =
|
||||
!isDefined(variables.input.action) ||
|
||||
payload.onDbEvent.action === variables.input.action;
|
||||
|
||||
const isObjectNameSingularMatching =
|
||||
!isDefined(variables.input.objectNameSingular) ||
|
||||
payload.onDbEvent.objectNameSingular ===
|
||||
variables.input.objectNameSingular;
|
||||
|
||||
const isRecordIdMatching =
|
||||
!isDefined(variables.input.recordId) ||
|
||||
payload.onDbEvent.record.id === variables.input.recordId;
|
||||
|
||||
return (
|
||||
isActionMatching && isObjectNameSingularMatching && isRecordIdMatching
|
||||
);
|
||||
},
|
||||
})
|
||||
onDbEvent(@Args('input') _: OnDbEventInput) {
|
||||
return this.pubSub.asyncIterator('onDbEvent');
|
||||
}
|
||||
}
|
||||
@ -12,4 +12,5 @@ export enum WorkflowRunExceptionCode {
|
||||
INVALID_INPUT = 'INVALID_INPUT',
|
||||
WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED',
|
||||
WORKFLOW_RUN_INVALID = 'WORKFLOW_RUN_INVALID',
|
||||
FAILURE = 'FAILURE',
|
||||
}
|
||||
|
||||
@ -1,12 +1,19 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm';
|
||||
|
||||
import { RecordPositionModule } from 'src/engine/core-modules/record-position/record-position.module';
|
||||
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
|
||||
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
|
||||
import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service';
|
||||
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
|
||||
|
||||
@Module({
|
||||
imports: [WorkflowCommonModule, RecordPositionModule],
|
||||
imports: [
|
||||
WorkflowCommonModule,
|
||||
NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'),
|
||||
RecordPositionModule,
|
||||
],
|
||||
providers: [WorkflowRunWorkspaceService, ScopedWorkspaceContextFactory],
|
||||
exports: [WorkflowRunWorkspaceService],
|
||||
})
|
||||
|
||||
@ -1,4 +1,7 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { InjectRepository } from '@nestjs/typeorm';
|
||||
|
||||
import { Repository } from 'typeorm';
|
||||
|
||||
import { RecordPositionService } from 'src/engine/core-modules/record-position/services/record-position.service';
|
||||
import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type';
|
||||
@ -17,14 +20,20 @@ import {
|
||||
WorkflowRunException,
|
||||
WorkflowRunExceptionCode,
|
||||
} from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception';
|
||||
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
|
||||
import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity';
|
||||
import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action';
|
||||
|
||||
@Injectable()
|
||||
export class WorkflowRunWorkspaceService {
|
||||
constructor(
|
||||
private readonly twentyORMManager: TwentyORMManager,
|
||||
private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService,
|
||||
private readonly recordPositionService: RecordPositionService,
|
||||
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
|
||||
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
|
||||
@InjectRepository(ObjectMetadataEntity, 'metadata')
|
||||
private readonly objectMetadataRepository: Repository<ObjectMetadataEntity>,
|
||||
private readonly recordPositionService: RecordPositionService,
|
||||
) {}
|
||||
|
||||
async createWorkflowRun({
|
||||
@ -131,11 +140,19 @@ export class WorkflowRunWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
return workflowRunRepository.update(workflowRunToUpdate.id, {
|
||||
const partialUpdate = {
|
||||
status: WorkflowRunStatus.RUNNING,
|
||||
startedAt: new Date().toISOString(),
|
||||
context,
|
||||
output,
|
||||
};
|
||||
|
||||
await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate);
|
||||
|
||||
await this.emitWorkflowRunUpdatedEvent({
|
||||
workflowRunBefore: workflowRunToUpdate,
|
||||
diff: partialUpdate,
|
||||
updatedFields: ['status', 'startedAt', 'context', 'output'],
|
||||
});
|
||||
}
|
||||
|
||||
@ -164,13 +181,21 @@ export class WorkflowRunWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
return workflowRunRepository.update(workflowRunToUpdate.id, {
|
||||
const partialUpdate = {
|
||||
status,
|
||||
endedAt: new Date().toISOString(),
|
||||
output: {
|
||||
...(workflowRunToUpdate.output ?? {}),
|
||||
error,
|
||||
},
|
||||
};
|
||||
|
||||
await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate);
|
||||
|
||||
await this.emitWorkflowRunUpdatedEvent({
|
||||
workflowRunBefore: workflowRunToUpdate,
|
||||
diff: partialUpdate,
|
||||
updatedFields: ['status', 'endedAt', 'output'],
|
||||
});
|
||||
}
|
||||
|
||||
@ -199,7 +224,7 @@ export class WorkflowRunWorkspaceService {
|
||||
);
|
||||
}
|
||||
|
||||
return workflowRunRepository.update(workflowRunId, {
|
||||
const partialUpdate = {
|
||||
output: {
|
||||
flow: workflowRunToUpdate.output?.flow ?? {
|
||||
trigger: undefined,
|
||||
@ -211,6 +236,14 @@ export class WorkflowRunWorkspaceService {
|
||||
},
|
||||
},
|
||||
context,
|
||||
};
|
||||
|
||||
await workflowRunRepository.update(workflowRunId, partialUpdate);
|
||||
|
||||
await this.emitWorkflowRunUpdatedEvent({
|
||||
workflowRunBefore: workflowRunToUpdate,
|
||||
diff: partialUpdate,
|
||||
updatedFields: ['context', 'output'],
|
||||
});
|
||||
}
|
||||
|
||||
@ -251,7 +284,7 @@ export class WorkflowRunWorkspaceService {
|
||||
(existingStep) => (step.id === existingStep.id ? step : existingStep),
|
||||
);
|
||||
|
||||
return workflowRunRepository.update(workflowRunToUpdate.id, {
|
||||
const partialUpdate = {
|
||||
output: {
|
||||
...(workflowRunToUpdate.output ?? {}),
|
||||
flow: {
|
||||
@ -259,6 +292,14 @@ export class WorkflowRunWorkspaceService {
|
||||
steps: updatedSteps,
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate);
|
||||
|
||||
await this.emitWorkflowRunUpdatedEvent({
|
||||
workflowRunBefore: workflowRunToUpdate,
|
||||
diff: partialUpdate,
|
||||
updatedFields: ['output'],
|
||||
});
|
||||
}
|
||||
|
||||
@ -283,4 +324,68 @@ export class WorkflowRunWorkspaceService {
|
||||
|
||||
return workflowRun;
|
||||
}
|
||||
|
||||
private async emitWorkflowRunUpdatedEvent({
|
||||
workflowRunBefore,
|
||||
updatedFields,
|
||||
diff,
|
||||
}: {
|
||||
workflowRunBefore: WorkflowRunWorkspaceEntity;
|
||||
updatedFields: string[];
|
||||
diff: object;
|
||||
}) {
|
||||
const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId;
|
||||
|
||||
if (!workspaceId) {
|
||||
return;
|
||||
}
|
||||
|
||||
const objectMetadata = await this.objectMetadataRepository.findOne({
|
||||
where: {
|
||||
nameSingular: 'workflowRun',
|
||||
workspaceId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!objectMetadata) {
|
||||
throw new WorkflowRunException(
|
||||
'Object metadata not found',
|
||||
WorkflowRunExceptionCode.FAILURE,
|
||||
);
|
||||
}
|
||||
|
||||
const workflowRunRepository =
|
||||
await this.twentyORMManager.getRepository<WorkflowRunWorkspaceEntity>(
|
||||
'workflowRun',
|
||||
);
|
||||
|
||||
const workflowRunAfter = await workflowRunRepository.findOneBy({
|
||||
id: workflowRunBefore.id,
|
||||
});
|
||||
|
||||
if (!workflowRunAfter) {
|
||||
throw new WorkflowRunException(
|
||||
'WorkflowRun not found',
|
||||
WorkflowRunExceptionCode.FAILURE,
|
||||
);
|
||||
}
|
||||
|
||||
this.workspaceEventEmitter.emitDatabaseBatchEvent({
|
||||
objectMetadataNameSingular: 'workflowRun',
|
||||
action: DatabaseEventAction.UPDATED,
|
||||
events: [
|
||||
{
|
||||
recordId: workflowRunBefore.id,
|
||||
objectMetadata,
|
||||
properties: {
|
||||
after: workflowRunAfter,
|
||||
before: workflowRunBefore,
|
||||
updatedFields,
|
||||
diff,
|
||||
},
|
||||
},
|
||||
],
|
||||
workspaceId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user