diff --git a/package.json b/package.json index 2bc75888e..52f7c9325 100644 --- a/package.json +++ b/package.json @@ -99,7 +99,9 @@ "graphql-fields": "^2.0.3", "graphql-middleware": "^6.1.35", "graphql-rate-limit": "^3.3.0", + "graphql-redis-subscriptions": "^2.7.0", "graphql-scalars": "^1.23.0", + "graphql-sse": "^2.5.4", "graphql-subscriptions": "2.0.0", "graphql-tag": "^2.12.6", "graphql-type-json": "^0.3.2", @@ -354,7 +356,8 @@ "type-fest": "4.10.1", "typescript": "5.3.3", "prosemirror-model": "1.23.0", - "yjs": "13.6.18" + "yjs": "13.6.18", + "graphql-redis-subscriptions/ioredis": "^5.6.0" }, "version": "0.2.1", "nx": {}, diff --git a/packages/twenty-front/codegen.cjs b/packages/twenty-front/codegen.cjs index 05effffb7..f24c09648 100644 --- a/packages/twenty-front/codegen.cjs +++ b/packages/twenty-front/codegen.cjs @@ -12,7 +12,6 @@ module.exports = { '!./src/**/*.test.tsx', '!./src/**/*.stories.tsx', '!./src/**/__mocks__/*.ts', - '!./src/modules/users/graphql/queries/getCurrentUserAndViews.ts', ], overwrite: true, generates: { diff --git a/packages/twenty-front/jest.config.ts b/packages/twenty-front/jest.config.ts index 510d67bf1..2980ce3ab 100644 --- a/packages/twenty-front/jest.config.ts +++ b/packages/twenty-front/jest.config.ts @@ -69,6 +69,7 @@ const jestConfig: JestConfigWithTsJest = { 'config/*', 'graphql/queries/*', 'graphql/mutations/*', + 'graphql/subscriptions/*', 'graphql/fragments/*', 'types/*', 'constants/*', diff --git a/packages/twenty-front/nyc.config.cjs b/packages/twenty-front/nyc.config.cjs index 29f6d82cb..4ca921c13 100644 --- a/packages/twenty-front/nyc.config.cjs +++ b/packages/twenty-front/nyc.config.cjs @@ -9,7 +9,7 @@ const globalCoverage = { const modulesCoverage = { branches: 25, - statements: 44, + statements: 43, lines: 44, functions: 38, include: ['src/modules/**/*'], diff --git a/packages/twenty-front/src/generated/graphql.tsx b/packages/twenty-front/src/generated/graphql.tsx index 6ebb41371..59299c719 100644 --- a/packages/twenty-front/src/generated/graphql.tsx +++ b/packages/twenty-front/src/generated/graphql.tsx @@ -453,6 +453,15 @@ export type CustomDomainValidRecords = { records: Array; }; +/** Database Event Action */ +export enum DatabaseEventAction { + CREATED = 'CREATED', + DELETED = 'DELETED', + DESTROYED = 'DESTROYED', + RESTORED = 'RESTORED', + UPDATED = 'UPDATED' +} + export type DateFilter = { eq?: InputMaybe; gt?: InputMaybe; @@ -1366,6 +1375,21 @@ export type ObjectStandardOverrides = { translations?: Maybe; }; +export type OnDbEventDto = { + __typename?: 'OnDbEventDTO'; + action: DatabaseEventAction; + eventDate: Scalars['DateTime']; + objectNameSingular: Scalars['String']; + record: Scalars['JSON']; + updatedFields?: Maybe>; +}; + +export type OnDbEventInput = { + action?: InputMaybe; + objectNameSingular?: InputMaybe; + recordId?: InputMaybe; +}; + /** Onboarding status */ export enum OnboardingStatus { COMPLETED = 'COMPLETED', @@ -1885,6 +1909,16 @@ export type SubmitFormStepInput = { workflowRunId: Scalars['String']; }; +export type Subscription = { + __typename?: 'Subscription'; + onDbEvent: OnDbEventDto; +}; + + +export type SubscriptionOnDbEventArgs = { + input: OnDbEventInput; +}; + export enum SubscriptionInterval { Day = 'Day', Month = 'Month', @@ -2796,6 +2830,13 @@ export type GetSsoIdentityProvidersQueryVariables = Exact<{ [key: string]: never export type GetSsoIdentityProvidersQuery = { __typename?: 'Query', getSSOIdentityProviders: Array<{ __typename?: 'FindAvailableSSOIDPOutput', type: IdentityProviderType, id: string, name: string, issuer: string, status: SsoIdentityProviderStatus }> }; +export type OnDbEventSubscriptionVariables = Exact<{ + input: OnDbEventInput; +}>; + + +export type OnDbEventSubscription = { __typename?: 'Subscription', onDbEvent: { __typename?: 'OnDbEventDTO', eventDate: string, action: DatabaseEventAction, objectNameSingular: string, updatedFields?: Array | null, record: any } }; + export type UserQueryFragmentFragment = { __typename?: 'User', id: any, firstName: string, lastName: string, email: string, canAccessFullAdminPanel: boolean, canImpersonate: boolean, supportUserHash?: string | null, onboardingStatus?: OnboardingStatus | null, userVars: any, workspaceMember?: { __typename?: 'WorkspaceMember', id: any, colorScheme: string, avatarUrl?: string | null, locale?: string | null, userEmail: string, timeZone?: string | null, dateFormat?: WorkspaceMemberDateFormatEnum | null, timeFormat?: WorkspaceMemberTimeFormatEnum | null, name: { __typename?: 'FullName', firstName: string, lastName: string } } | null, workspaceMembers?: Array<{ __typename?: 'WorkspaceMember', id: any, colorScheme: string, avatarUrl?: string | null, locale?: string | null, userEmail: string, timeZone?: string | null, dateFormat?: WorkspaceMemberDateFormatEnum | null, timeFormat?: WorkspaceMemberTimeFormatEnum | null, name: { __typename?: 'FullName', firstName: string, lastName: string } }> | null, currentUserWorkspace?: { __typename?: 'UserWorkspace', settingsPermissions?: Array | null, objectRecordsPermissions?: Array | null } | null, currentWorkspace?: { __typename?: 'Workspace', id: any, displayName?: string | null, logo?: string | null, inviteHash?: string | null, allowImpersonation: boolean, activationStatus: WorkspaceActivationStatus, isPublicInviteLinkEnabled: boolean, isGoogleAuthEnabled: boolean, isMicrosoftAuthEnabled: boolean, isPasswordAuthEnabled: boolean, subdomain: string, hasValidEnterpriseKey: boolean, customDomain?: string | null, isCustomDomainEnabled: boolean, metadataVersion: number, workspaceMembersCount?: number | null, workspaceUrls: { __typename?: 'WorkspaceUrls', subdomainUrl: string, customUrl?: string | null }, featureFlags?: Array<{ __typename?: 'FeatureFlagDTO', key: FeatureFlagKey, value: boolean }> | null, currentBillingSubscription?: { __typename?: 'BillingSubscription', id: any, status: SubscriptionStatus, interval?: SubscriptionInterval | null, billingSubscriptionItems?: Array<{ __typename?: 'BillingSubscriptionItem', id: any, hasReachedCurrentPeriodCap: boolean, billingProduct?: { __typename?: 'BillingProduct', name: string, description: string, metadata: { __typename?: 'BillingProductMetadata', planKey: BillingPlanKey, priceUsageBased: BillingUsageType, productKey: BillingProductKey } } | null }> | null } | null, billingSubscriptions: Array<{ __typename?: 'BillingSubscription', id: any, status: SubscriptionStatus }>, defaultRole?: { __typename?: 'Role', id: string, label: string, description?: string | null, icon?: string | null, canUpdateAllSettings: boolean, isEditable: boolean, canReadAllObjectRecords: boolean, canUpdateAllObjectRecords: boolean, canSoftDeleteAllObjectRecords: boolean, canDestroyAllObjectRecords: boolean } | null } | null, workspaces: Array<{ __typename?: 'UserWorkspace', workspace?: { __typename?: 'Workspace', id: any, logo?: string | null, displayName?: string | null, subdomain: string, customDomain?: string | null, workspaceUrls: { __typename?: 'WorkspaceUrls', subdomainUrl: string, customUrl?: string | null } } | null }> }; export type DeleteUserAccountMutationVariables = Exact<{ [key: string]: never; }>; @@ -5420,6 +5461,40 @@ export function useGetSsoIdentityProvidersLazyQuery(baseOptions?: Apollo.LazyQue export type GetSsoIdentityProvidersQueryHookResult = ReturnType; export type GetSsoIdentityProvidersLazyQueryHookResult = ReturnType; export type GetSsoIdentityProvidersQueryResult = Apollo.QueryResult; +export const OnDbEventDocument = gql` + subscription OnDbEvent($input: OnDbEventInput!) { + onDbEvent(input: $input) { + eventDate + action + objectNameSingular + updatedFields + record + } +} + `; + +/** + * __useOnDbEventSubscription__ + * + * To run a query within a React component, call `useOnDbEventSubscription` and pass it any options that fit your needs. + * When your component renders, `useOnDbEventSubscription` returns an object from Apollo Client that contains loading, error, and data properties + * you can use to render your UI. + * + * @param baseOptions options that will be passed into the subscription, supported options are listed on: https://www.apollographql.com/docs/react/api/react-hooks/#options; + * + * @example + * const { data, loading, error } = useOnDbEventSubscription({ + * variables: { + * input: // value for 'input' + * }, + * }); + */ +export function useOnDbEventSubscription(baseOptions: Apollo.SubscriptionHookOptions) { + const options = {...defaultOptions, ...baseOptions} + return Apollo.useSubscription(OnDbEventDocument, options); + } +export type OnDbEventSubscriptionHookResult = ReturnType; +export type OnDbEventSubscriptionResult = Apollo.SubscriptionResult; export const DeleteUserAccountDocument = gql` mutation DeleteUserAccount { deleteUser { diff --git a/packages/twenty-front/src/modules/apollo/services/apollo.factory.ts b/packages/twenty-front/src/modules/apollo/services/apollo.factory.ts index 97192aa9c..d5966648b 100644 --- a/packages/twenty-front/src/modules/apollo/services/apollo.factory.ts +++ b/packages/twenty-front/src/modules/apollo/services/apollo.factory.ts @@ -23,6 +23,7 @@ import { cookieStorage } from '~/utils/cookie-storage'; import { isUndefinedOrNull } from '~/utils/isUndefinedOrNull'; import { ApolloManager } from '../types/apolloManager.interface'; import { loggerLink } from '../utils/loggerLink'; +import { getTokenPair } from '../utils/getTokenPair'; const logger = loggerLink(() => 'Twenty'); @@ -55,14 +56,6 @@ export class ApolloFactory implements ApolloManager { this.currentWorkspaceMember = currentWorkspaceMember; - const getTokenPair = () => { - const stringTokenPair = cookieStorage.getItem('tokenPair'); - const tokenPair = isDefined(stringTokenPair) - ? (JSON.parse(stringTokenPair) as AuthTokenPair) - : undefined; - return tokenPair; - }; - const buildApolloLink = (): ApolloLink => { const httpLink = createUploadLink({ uri, diff --git a/packages/twenty-front/src/modules/apollo/utils/getTokenPair.ts b/packages/twenty-front/src/modules/apollo/utils/getTokenPair.ts new file mode 100644 index 000000000..b8ba4ac41 --- /dev/null +++ b/packages/twenty-front/src/modules/apollo/utils/getTokenPair.ts @@ -0,0 +1,10 @@ +import { cookieStorage } from '~/utils/cookie-storage'; +import { isDefined } from 'twenty-shared/utils'; +import { AuthTokenPair } from '~/generated/graphql'; + +export const getTokenPair = () => { + const stringTokenPair = cookieStorage.getItem('tokenPair'); + return isDefined(stringTokenPair) + ? (JSON.parse(stringTokenPair) as AuthTokenPair) + : undefined; +}; diff --git a/packages/twenty-front/src/modules/object-record/record-show/components/CardComponents.tsx b/packages/twenty-front/src/modules/object-record/record-show/components/CardComponents.tsx index 4fd8631ac..62ddeba26 100644 --- a/packages/twenty-front/src/modules/object-record/record-show/components/CardComponents.tsx +++ b/packages/twenty-front/src/modules/object-record/record-show/components/CardComponents.tsx @@ -15,6 +15,7 @@ import { WorkflowVersionVisualizerEffect } from '@/workflow/workflow-diagram/com import { WorkflowVisualizer } from '@/workflow/workflow-diagram/components/WorkflowVisualizer'; import { WorkflowVisualizerEffect } from '@/workflow/workflow-diagram/components/WorkflowVisualizerEffect'; import styled from '@emotion/styled'; +import { ListenRecordUpdatesEffect } from '@/subscription/components/ListenUpdatesEffect'; const StyledGreyBox = styled.div<{ isInRightDrawer?: boolean }>` background: ${({ theme, isInRightDrawer }) => @@ -99,6 +100,11 @@ export const CardComponents: Record = { [CardType.WorkflowRunCard]: ({ targetableObject }) => ( <> + diff --git a/packages/twenty-front/src/modules/subscription/components/ListenUpdatesEffect.tsx b/packages/twenty-front/src/modules/subscription/components/ListenUpdatesEffect.tsx new file mode 100644 index 000000000..f740a04f8 --- /dev/null +++ b/packages/twenty-front/src/modules/subscription/components/ListenUpdatesEffect.tsx @@ -0,0 +1,45 @@ +import { useApolloClient } from '@apollo/client'; +import { useOnDbEvent } from '@/subscription/hooks/useOnDbEvent'; +import { DatabaseEventAction } from '~/generated/graphql'; +import { capitalize, isDefined } from 'twenty-shared/utils'; + +type ListenRecordUpdatesEffectProps = { + objectNameSingular: string; + recordId: string; + listenedFields: string[]; +}; + +export const ListenRecordUpdatesEffect = ({ + objectNameSingular, + recordId, + listenedFields, +}: ListenRecordUpdatesEffectProps) => { + const apolloClient = useApolloClient(); + + useOnDbEvent({ + input: { recordId, action: DatabaseEventAction.UPDATED }, + onData: (data) => { + const updatedRecord = data.onDbEvent.record; + + const fieldsUpdater = listenedFields.reduce((acc, listenedField) => { + if (!isDefined(updatedRecord[listenedField])) { + return acc; + } + return { + ...acc, + [listenedField]: () => updatedRecord[listenedField], + }; + }, {}); + + apolloClient.cache.modify({ + id: apolloClient.cache.identify({ + __typename: capitalize(objectNameSingular), + id: recordId, + }), + fields: fieldsUpdater, + }); + }, + }); + + return null; +}; diff --git a/packages/twenty-front/src/modules/subscription/graphql/subscriptions/onDbEvent.ts b/packages/twenty-front/src/modules/subscription/graphql/subscriptions/onDbEvent.ts new file mode 100644 index 000000000..b106ccde0 --- /dev/null +++ b/packages/twenty-front/src/modules/subscription/graphql/subscriptions/onDbEvent.ts @@ -0,0 +1,13 @@ +import { gql } from '@apollo/client'; + +export const ON_DB_EVENT = gql` + subscription OnDbEvent($input: OnDbEventInput!) { + onDbEvent(input: $input) { + eventDate + action + objectNameSingular + updatedFields + record + } + } +`; diff --git a/packages/twenty-front/src/modules/subscription/hooks/useOnDbEvent.ts b/packages/twenty-front/src/modules/subscription/hooks/useOnDbEvent.ts new file mode 100644 index 000000000..1712ad930 --- /dev/null +++ b/packages/twenty-front/src/modules/subscription/hooks/useOnDbEvent.ts @@ -0,0 +1,58 @@ +import { useEffect, useMemo } from 'react'; +import { createClient } from 'graphql-sse'; +import { ON_DB_EVENT } from '@/subscription/graphql/subscriptions/onDbEvent'; +import { Subscription, SubscriptionOnDbEventArgs } from '~/generated/graphql'; +import { REACT_APP_SERVER_BASE_URL } from '~/config'; +import { getTokenPair } from '@/apollo/utils/getTokenPair'; + +type OnDbEventArgs = SubscriptionOnDbEventArgs & { + skip?: boolean; + onData?: (data: Subscription) => void; + onError?: (err: any) => void; + onComplete?: () => void; +}; + +export const useOnDbEvent = ({ + onData, + onError, + onComplete, + input, + skip = false, +}: OnDbEventArgs) => { + const tokenPair = getTokenPair(); + + const sseClient = useMemo(() => { + return createClient({ + url: `${REACT_APP_SERVER_BASE_URL}/graphql`, + headers: { + Authorization: tokenPair?.accessToken.token + ? `Bearer ${tokenPair?.accessToken.token}` + : '', + }, + }); + }, [tokenPair?.accessToken.token]); + + useEffect(() => { + if (skip === true) { + return; + } + const next = (value: { data: Subscription }) => onData?.(value.data); + const error = (err: unknown) => onError?.(err); + const complete = () => onComplete?.(); + const unsubscribe = sseClient.subscribe( + { + query: ON_DB_EVENT.loc?.source.body || '', + variables: { input }, + }, + { + next, + error, + complete, + }, + ); + + return () => { + unsubscribe(); + }; + }, [input, onComplete, onData, onError, skip, sseClient]); +}; diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts index cef535df2..bfb14d4ea 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-query-runner/listeners/entity-events-to-db.listener.ts @@ -16,6 +16,7 @@ import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/wo import { CreateAuditLogFromInternalEvent } from 'src/modules/timeline/jobs/create-audit-log-from-internal-event'; import { UpsertTimelineActivityFromInternalEvent } from 'src/modules/timeline/jobs/upsert-timeline-activity-from-internal-event.job'; import { CallWebhookJobsJob } from 'src/modules/webhook/jobs/call-webhook-jobs.job'; +import { SubscriptionsJob } from 'src/engine/subscriptions/subscriptions.job'; @Injectable() export class EntityEventsToDbListener { @@ -24,6 +25,8 @@ export class EntityEventsToDbListener { private readonly entityEventsToDbQueueService: MessageQueueService, @InjectMessageQueue(MessageQueue.webhookQueue) private readonly webhookQueueService: MessageQueueService, + @InjectMessageQueue(MessageQueue.subscriptionsQueue) + private readonly subscriptionsQueueService: MessageQueueService, ) {} @OnDatabaseBatchEvent('*', DatabaseEventAction.CREATED) @@ -64,6 +67,11 @@ export class EntityEventsToDbListener { ); await Promise.all([ + this.subscriptionsQueueService.add>( + SubscriptionsJob.name, + batchEvent, + { retryLimit: 3 }, + ), this.webhookQueueService.add>( CallWebhookJobsJob.name, batchEvent, diff --git a/packages/twenty-server/src/engine/api/graphql/workspace-resolver-builder/workspace-resolver-builder.module.ts b/packages/twenty-server/src/engine/api/graphql/workspace-resolver-builder/workspace-resolver-builder.module.ts index 028c2b74e..89b8b3689 100644 --- a/packages/twenty-server/src/engine/api/graphql/workspace-resolver-builder/workspace-resolver-builder.module.ts +++ b/packages/twenty-server/src/engine/api/graphql/workspace-resolver-builder/workspace-resolver-builder.module.ts @@ -1,7 +1,6 @@ import { Module } from '@nestjs/common'; import { GraphqlQueryRunnerModule } from 'src/engine/api/graphql/graphql-query-runner/graphql-query-runner.module'; -import { WorkspaceQueryRunnerModule } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module'; import { WorkspaceResolverBuilderService } from 'src/engine/api/graphql/workspace-resolver-builder/workspace-resolver-builder.service'; import { FeatureFlagModule } from 'src/engine/core-modules/feature-flag/feature-flag.module'; @@ -10,11 +9,7 @@ import { WorkspaceResolverFactory } from './workspace-resolver.factory'; import { workspaceResolverBuilderFactories } from './factories/factories'; @Module({ - imports: [ - WorkspaceQueryRunnerModule, - GraphqlQueryRunnerModule, - FeatureFlagModule, - ], + imports: [GraphqlQueryRunnerModule, FeatureFlagModule], providers: [ ...workspaceResolverBuilderFactories, WorkspaceResolverFactory, diff --git a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts index 15e616085..e5e2d2b87 100644 --- a/packages/twenty-server/src/engine/core-modules/core-engine.module.ts +++ b/packages/twenty-server/src/engine/core-modules/core-engine.module.ts @@ -48,6 +48,8 @@ import { WorkspaceInvitationModule } from 'src/engine/core-modules/workspace-inv import { WorkspaceModule } from 'src/engine/core-modules/workspace/workspace.module'; import { RoleModule } from 'src/engine/metadata-modules/role/role.module'; import { WorkspaceEventEmitterModule } from 'src/engine/workspace-event-emitter/workspace-event-emitter.module'; +import { WorkspaceQueryRunnerModule } from 'src/engine/api/graphql/workspace-query-runner/workspace-query-runner.module'; +import { SubscriptionsModule } from 'src/engine/subscriptions/subscriptions.module'; import { AnalyticsModule } from './analytics/analytics.module'; import { ClientConfigModule } from './client-config/client-config.module'; @@ -81,6 +83,8 @@ import { FileModule } from './file/file.module'; RoleModule, TwentyConfigModule, RedisClientModule, + WorkspaceQueryRunnerModule, + SubscriptionsModule, FileStorageModule.forRootAsync({ useFactory: fileStorageModuleFactory, inject: [TwentyConfigService], diff --git a/packages/twenty-server/src/engine/core-modules/message-queue/jobs.module.ts b/packages/twenty-server/src/engine/core-modules/message-queue/jobs.module.ts index d7b8c87ff..2fe7d799e 100644 --- a/packages/twenty-server/src/engine/core-modules/message-queue/jobs.module.ts +++ b/packages/twenty-server/src/engine/core-modules/message-queue/jobs.module.ts @@ -32,6 +32,7 @@ import { TimelineJobModule } from 'src/modules/timeline/jobs/timeline-job.module import { TimelineActivityModule } from 'src/modules/timeline/timeline-activity.module'; import { WebhookJobModule } from 'src/modules/webhook/jobs/webhook-job.module'; import { WorkflowModule } from 'src/modules/workflow/workflow.module'; +import { SubscriptionsModule } from 'src/engine/subscriptions/subscriptions.module'; @Module({ imports: [ @@ -58,6 +59,7 @@ import { WorkflowModule } from 'src/modules/workflow/workflow.module'; WorkflowModule, FavoriteModule, WorkspaceCleanerModule, + SubscriptionsModule, ], providers: [ CleanSuspendedWorkspacesJob, diff --git a/packages/twenty-server/src/engine/core-modules/message-queue/message-queue.constants.ts b/packages/twenty-server/src/engine/core-modules/message-queue/message-queue.constants.ts index e6f0f11a4..f5e2baa11 100644 --- a/packages/twenty-server/src/engine/core-modules/message-queue/message-queue.constants.ts +++ b/packages/twenty-server/src/engine/core-modules/message-queue/message-queue.constants.ts @@ -19,4 +19,5 @@ export enum MessageQueue { workflowQueue = 'workflow-queue', serverlessFunctionQueue = 'serverless-function-queue', deleteCascadeQueue = 'delete-cascade-queue', + subscriptionsQueue = 'subscriptions-queue', } diff --git a/packages/twenty-server/src/engine/subscriptions/dtos/on-db-event.dto.ts b/packages/twenty-server/src/engine/subscriptions/dtos/on-db-event.dto.ts new file mode 100644 index 000000000..d3de16c8c --- /dev/null +++ b/packages/twenty-server/src/engine/subscriptions/dtos/on-db-event.dto.ts @@ -0,0 +1,30 @@ +import { Field, ObjectType, registerEnumType } from '@nestjs/graphql'; + +import GraphQLJSON from 'graphql-type-json'; + +import { ObjectRecord } from 'src/engine/api/graphql/workspace-query-builder/interfaces/object-record.interface'; + +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; + +registerEnumType(DatabaseEventAction, { + name: 'DatabaseEventAction', + description: 'Database Event Action', +}); + +@ObjectType() +export class OnDbEventDTO { + @Field(() => DatabaseEventAction) + action: DatabaseEventAction; + + @Field(() => String) + objectNameSingular: string; + + @Field() + eventDate: Date; + + @Field(() => GraphQLJSON) + record: ObjectRecord; + + @Field(() => [String], { nullable: true }) + updatedFields?: string[]; +} diff --git a/packages/twenty-server/src/engine/subscriptions/dtos/on-db-event.input.ts b/packages/twenty-server/src/engine/subscriptions/dtos/on-db-event.input.ts new file mode 100644 index 000000000..a38526ec4 --- /dev/null +++ b/packages/twenty-server/src/engine/subscriptions/dtos/on-db-event.input.ts @@ -0,0 +1,15 @@ +import { Field, InputType } from '@nestjs/graphql'; + +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; + +@InputType() +export class OnDbEventInput { + @Field(() => DatabaseEventAction, { nullable: true }) + action?: DatabaseEventAction; + + @Field(() => String, { nullable: true }) + objectNameSingular?: string; + + @Field(() => String, { nullable: true }) + recordId?: string; +} diff --git a/packages/twenty-server/src/engine/subscriptions/subscriptions.job.ts b/packages/twenty-server/src/engine/subscriptions/subscriptions.job.ts new file mode 100644 index 000000000..5a88b2404 --- /dev/null +++ b/packages/twenty-server/src/engine/subscriptions/subscriptions.job.ts @@ -0,0 +1,52 @@ +import { Inject } from '@nestjs/common'; + +import { isDefined } from 'twenty-shared/utils'; +import { RedisPubSub } from 'graphql-redis-subscriptions'; + +import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; +import { Processor } from 'src/engine/core-modules/message-queue/decorators/processor.decorator'; +import { Process } from 'src/engine/core-modules/message-queue/decorators/process.decorator'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type'; +import { ObjectRecordEvent } from 'src/engine/core-modules/event-emitter/types/object-record-event.event'; +import { removeSecretFromWebhookRecord } from 'src/utils/remove-secret-from-webhook-record'; + +@Processor(MessageQueue.subscriptionsQueue) +export class SubscriptionsJob { + constructor(@Inject('PUB_SUB') private readonly pubSub: RedisPubSub) {} + + @Process(SubscriptionsJob.name) + async handle( + workspaceEventBatch: WorkspaceEventBatch, + ): Promise { + for (const eventData of workspaceEventBatch.events) { + const [nameSingular, operation] = workspaceEventBatch.name.split('.'); + const record = + 'after' in eventData.properties && isDefined(eventData.properties.after) + ? eventData.properties.after + : 'before' in eventData.properties && + isDefined(eventData.properties.before) + ? eventData.properties.before + : {}; + const updatedFields = + 'updatedFields' in eventData.properties + ? eventData.properties.updatedFields + : undefined; + + const isWebhookEvent = nameSingular === 'webhook'; + const sanitizedRecord = removeSecretFromWebhookRecord( + record, + isWebhookEvent, + ); + + await this.pubSub.publish('onDbEvent', { + onDbEvent: { + action: operation, + objectNameSingular: nameSingular, + eventDate: new Date(), + record: sanitizedRecord, + ...(updatedFields && { updatedFields }), + }, + }); + } + } +} diff --git a/packages/twenty-server/src/engine/subscriptions/subscriptions.module.ts b/packages/twenty-server/src/engine/subscriptions/subscriptions.module.ts new file mode 100644 index 000000000..1d64cbd4b --- /dev/null +++ b/packages/twenty-server/src/engine/subscriptions/subscriptions.module.ts @@ -0,0 +1,34 @@ +import { Inject, Module, OnModuleDestroy } from '@nestjs/common'; + +import { RedisPubSub } from 'graphql-redis-subscriptions'; + +import { RedisClientService } from 'src/engine/core-modules/redis-client/redis-client.service'; +import { SubscriptionsResolver } from 'src/engine/subscriptions/subscriptions.resolver'; +import { SubscriptionsJob } from 'src/engine/subscriptions/subscriptions.job'; + +@Module({ + exports: ['PUB_SUB'], + providers: [ + { + provide: 'PUB_SUB', + inject: [RedisClientService], + + useFactory: (redisClientService: RedisClientService) => + new RedisPubSub({ + publisher: redisClientService.getClient().duplicate(), + subscriber: redisClientService.getClient().duplicate(), + }), + }, + SubscriptionsResolver, + SubscriptionsJob, + ], +}) +export class SubscriptionsModule implements OnModuleDestroy { + constructor(@Inject('PUB_SUB') private readonly pubSub: RedisPubSub) {} + + async onModuleDestroy() { + if (this.pubSub) { + await this.pubSub.close(); + } + } +} diff --git a/packages/twenty-server/src/engine/subscriptions/subscriptions.resolver.ts b/packages/twenty-server/src/engine/subscriptions/subscriptions.resolver.ts new file mode 100644 index 000000000..c663be28d --- /dev/null +++ b/packages/twenty-server/src/engine/subscriptions/subscriptions.resolver.ts @@ -0,0 +1,43 @@ +import { Args, Resolver, Subscription } from '@nestjs/graphql'; +import { Inject, UseGuards } from '@nestjs/common'; + +import { RedisPubSub } from 'graphql-redis-subscriptions'; +import { isDefined } from 'twenty-shared/utils'; + +import { OnDbEventDTO } from 'src/engine/subscriptions/dtos/on-db-event.dto'; +import { OnDbEventInput } from 'src/engine/subscriptions/dtos/on-db-event.input'; +import { WorkspaceAuthGuard } from 'src/engine/guards/workspace-auth.guard'; +import { UserAuthGuard } from 'src/engine/guards/user-auth.guard'; + +@Resolver() +@UseGuards(WorkspaceAuthGuard, UserAuthGuard) +export class SubscriptionsResolver { + constructor(@Inject('PUB_SUB') private readonly pubSub: RedisPubSub) {} + + @Subscription(() => OnDbEventDTO, { + filter: ( + payload: { onDbEvent: OnDbEventDTO }, + variables: { input: OnDbEventInput }, + ) => { + const isActionMatching = + !isDefined(variables.input.action) || + payload.onDbEvent.action === variables.input.action; + + const isObjectNameSingularMatching = + !isDefined(variables.input.objectNameSingular) || + payload.onDbEvent.objectNameSingular === + variables.input.objectNameSingular; + + const isRecordIdMatching = + !isDefined(variables.input.recordId) || + payload.onDbEvent.record.id === variables.input.recordId; + + return ( + isActionMatching && isObjectNameSingularMatching && isRecordIdMatching + ); + }, + }) + onDbEvent(@Args('input') _: OnDbEventInput) { + return this.pubSub.asyncIterator('onDbEvent'); + } +} diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts index ecace5fb8..299e334a7 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/exceptions/workflow-run.exception.ts @@ -12,4 +12,5 @@ export enum WorkflowRunExceptionCode { INVALID_INPUT = 'INVALID_INPUT', WORKFLOW_RUN_LIMIT_REACHED = 'WORKFLOW_RUN_LIMIT_REACHED', WORKFLOW_RUN_INVALID = 'WORKFLOW_RUN_INVALID', + FAILURE = 'FAILURE', } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts index 00d43bf22..ea3fbabc3 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.module.ts @@ -1,12 +1,19 @@ import { Module } from '@nestjs/common'; +import { NestjsQueryTypeOrmModule } from '@ptc-org/nestjs-query-typeorm'; + import { RecordPositionModule } from 'src/engine/core-modules/record-position/record-position.module'; import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; @Module({ - imports: [WorkflowCommonModule, RecordPositionModule], + imports: [ + WorkflowCommonModule, + NestjsQueryTypeOrmModule.forFeature([ObjectMetadataEntity], 'metadata'), + RecordPositionModule, + ], providers: [WorkflowRunWorkspaceService, ScopedWorkspaceContextFactory], exports: [WorkflowRunWorkspaceService], }) diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts index 912f7faa6..26248755c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-run/workflow-run.workspace-service.ts @@ -1,4 +1,7 @@ import { Injectable } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; import { RecordPositionService } from 'src/engine/core-modules/record-position/services/record-position.service'; import { ActorMetadata } from 'src/engine/metadata-modules/field-metadata/composite-types/actor.composite-type'; @@ -17,14 +20,20 @@ import { WorkflowRunException, WorkflowRunExceptionCode, } from 'src/modules/workflow/workflow-runner/exceptions/workflow-run.exception'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; +import { ObjectMetadataEntity } from 'src/engine/metadata-modules/object-metadata/object-metadata.entity'; +import { DatabaseEventAction } from 'src/engine/api/graphql/graphql-query-runner/enums/database-event-action'; @Injectable() export class WorkflowRunWorkspaceService { constructor( private readonly twentyORMManager: TwentyORMManager, private readonly workflowCommonWorkspaceService: WorkflowCommonWorkspaceService, - private readonly recordPositionService: RecordPositionService, private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, + @InjectRepository(ObjectMetadataEntity, 'metadata') + private readonly objectMetadataRepository: Repository, + private readonly recordPositionService: RecordPositionService, ) {} async createWorkflowRun({ @@ -131,11 +140,19 @@ export class WorkflowRunWorkspaceService { ); } - return workflowRunRepository.update(workflowRunToUpdate.id, { + const partialUpdate = { status: WorkflowRunStatus.RUNNING, startedAt: new Date().toISOString(), context, output, + }; + + await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); + + await this.emitWorkflowRunUpdatedEvent({ + workflowRunBefore: workflowRunToUpdate, + diff: partialUpdate, + updatedFields: ['status', 'startedAt', 'context', 'output'], }); } @@ -164,13 +181,21 @@ export class WorkflowRunWorkspaceService { ); } - return workflowRunRepository.update(workflowRunToUpdate.id, { + const partialUpdate = { status, endedAt: new Date().toISOString(), output: { ...(workflowRunToUpdate.output ?? {}), error, }, + }; + + await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); + + await this.emitWorkflowRunUpdatedEvent({ + workflowRunBefore: workflowRunToUpdate, + diff: partialUpdate, + updatedFields: ['status', 'endedAt', 'output'], }); } @@ -199,7 +224,7 @@ export class WorkflowRunWorkspaceService { ); } - return workflowRunRepository.update(workflowRunId, { + const partialUpdate = { output: { flow: workflowRunToUpdate.output?.flow ?? { trigger: undefined, @@ -211,6 +236,14 @@ export class WorkflowRunWorkspaceService { }, }, context, + }; + + await workflowRunRepository.update(workflowRunId, partialUpdate); + + await this.emitWorkflowRunUpdatedEvent({ + workflowRunBefore: workflowRunToUpdate, + diff: partialUpdate, + updatedFields: ['context', 'output'], }); } @@ -251,7 +284,7 @@ export class WorkflowRunWorkspaceService { (existingStep) => (step.id === existingStep.id ? step : existingStep), ); - return workflowRunRepository.update(workflowRunToUpdate.id, { + const partialUpdate = { output: { ...(workflowRunToUpdate.output ?? {}), flow: { @@ -259,6 +292,14 @@ export class WorkflowRunWorkspaceService { steps: updatedSteps, }, }, + }; + + await workflowRunRepository.update(workflowRunToUpdate.id, partialUpdate); + + await this.emitWorkflowRunUpdatedEvent({ + workflowRunBefore: workflowRunToUpdate, + diff: partialUpdate, + updatedFields: ['output'], }); } @@ -283,4 +324,68 @@ export class WorkflowRunWorkspaceService { return workflowRun; } + + private async emitWorkflowRunUpdatedEvent({ + workflowRunBefore, + updatedFields, + diff, + }: { + workflowRunBefore: WorkflowRunWorkspaceEntity; + updatedFields: string[]; + diff: object; + }) { + const workspaceId = this.scopedWorkspaceContextFactory.create().workspaceId; + + if (!workspaceId) { + return; + } + + const objectMetadata = await this.objectMetadataRepository.findOne({ + where: { + nameSingular: 'workflowRun', + workspaceId, + }, + }); + + if (!objectMetadata) { + throw new WorkflowRunException( + 'Object metadata not found', + WorkflowRunExceptionCode.FAILURE, + ); + } + + const workflowRunRepository = + await this.twentyORMManager.getRepository( + 'workflowRun', + ); + + const workflowRunAfter = await workflowRunRepository.findOneBy({ + id: workflowRunBefore.id, + }); + + if (!workflowRunAfter) { + throw new WorkflowRunException( + 'WorkflowRun not found', + WorkflowRunExceptionCode.FAILURE, + ); + } + + this.workspaceEventEmitter.emitDatabaseBatchEvent({ + objectMetadataNameSingular: 'workflowRun', + action: DatabaseEventAction.UPDATED, + events: [ + { + recordId: workflowRunBefore.id, + objectMetadata, + properties: { + after: workflowRunAfter, + before: workflowRunBefore, + updatedFields, + diff, + }, + }, + ], + workspaceId, + }); + } } diff --git a/yarn.lock b/yarn.lock index 0beeaeb53..cc5ec7f55 100644 --- a/yarn.lock +++ b/yarn.lock @@ -36139,6 +36139,20 @@ __metadata: languageName: node linkType: hard +"graphql-redis-subscriptions@npm:^2.7.0": + version: 2.7.0 + resolution: "graphql-redis-subscriptions@npm:2.7.0" + dependencies: + ioredis: "npm:^5.3.2" + peerDependencies: + graphql-subscriptions: ^1.0.0 || ^2.0.0 || ^3.0.0 + dependenciesMeta: + ioredis: + optional: true + checksum: 10c0/f98e9a16aa60d5470f6916f5a85b0b91898e3ec341a70ae3ddac878aa5b415dae9081ba872afdab5873cef3933fde1c3f1ee690ffa6d5b6d164a9165aed5cad1 + languageName: node + linkType: hard + "graphql-request@npm:^6.0.0": version: 6.1.0 resolution: "graphql-request@npm:6.1.0" @@ -36177,6 +36191,15 @@ __metadata: languageName: node linkType: hard +"graphql-sse@npm:^2.5.4": + version: 2.5.4 + resolution: "graphql-sse@npm:2.5.4" + peerDependencies: + graphql: ">=0.11 <=16" + checksum: 10c0/b2635a4098b86492ecb04e7aab7efb715847ad11b785591cf90cf1f342ba78a03db0c6cf903975a7ad9e2c278dee55cd9e2e9d7edb560f334528bb8ee926ed95 + languageName: node + linkType: hard + "graphql-subscriptions@npm:2.0.0": version: 2.0.0 resolution: "graphql-subscriptions@npm:2.0.0" @@ -38133,9 +38156,9 @@ __metadata: languageName: node linkType: hard -"ioredis@npm:^5.4.1": - version: 5.4.2 - resolution: "ioredis@npm:5.4.2" +"ioredis@npm:^5.4.1, ioredis@npm:^5.6.0": + version: 5.6.0 + resolution: "ioredis@npm:5.6.0" dependencies: "@ioredis/commands": "npm:^1.1.1" cluster-key-slot: "npm:^1.1.0" @@ -38146,7 +38169,7 @@ __metadata: redis-errors: "npm:^1.2.0" redis-parser: "npm:^3.0.0" standard-as-callback: "npm:^2.1.0" - checksum: 10c0/e59d2cceb43ed74b487d7b50fa91b93246e734e5d4835c7e62f64e44da072f12ab43b044248012e6f8b76c61a7c091a2388caad50e8ad69a8ce5515a730b23b8 + checksum: 10c0/a885e5146640fc448706871290ef424ffa39af561f7ee3cf1590085209a509f85e99082bdaaf3cd32fa66758aea3fc2055d1109648ddca96fac4944bf2092c30 languageName: node linkType: hard @@ -55380,7 +55403,9 @@ __metadata: graphql-fields: "npm:^2.0.3" graphql-middleware: "npm:^6.1.35" graphql-rate-limit: "npm:^3.3.0" + graphql-redis-subscriptions: "npm:^2.7.0" graphql-scalars: "npm:^1.23.0" + graphql-sse: "npm:^2.5.4" graphql-subscriptions: "npm:2.0.0" graphql-tag: "npm:^2.12.6" graphql-type-json: "npm:^0.3.2"