Build listener to backfill position (#4432)

* Build listener to backfill position

* Fix tests

---------

Co-authored-by: Thomas Trompette <thomast@twenty.com>
This commit is contained in:
Thomas Trompette
2024-03-13 10:27:34 +01:00
committed by GitHub
parent 62d414ee66
commit 7b63cf14bc
16 changed files with 314 additions and 81 deletions

View File

@ -1,6 +1,12 @@
import { BaseObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/base.object-metadata'; import { BaseObjectMetadata } from 'src/workspace/workspace-sync-metadata/standard-objects/base.object-metadata';
export type CreatedObjectMetadata = {
nameSingular: string;
isCustom: boolean;
};
export class ObjectRecordCreateEvent<T extends BaseObjectMetadata> { export class ObjectRecordCreateEvent<T extends BaseObjectMetadata> {
workspaceId: string; workspaceId: string;
createdRecord: T; createdRecord: T;
createdObjectMetadata: CreatedObjectMetadata;
} }

View File

@ -40,7 +40,9 @@ import { MessageQueueModule } from './message-queue/message-queue.module';
useFactory: emailModuleFactory, useFactory: emailModuleFactory,
inject: [EnvironmentService], inject: [EnvironmentService],
}), }),
EventEmitterModule.forRoot(), EventEmitterModule.forRoot({
wildcard: true,
}),
CacheStorageModule, CacheStorageModule,
], ],
exports: [], exports: [],

View File

@ -34,6 +34,8 @@ import { StripeModule } from 'src/core/billing/stripe/stripe.module';
import { Workspace } from 'src/core/workspace/workspace.entity'; import { Workspace } from 'src/core/workspace/workspace.entity';
import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity'; import { FeatureFlagEntity } from 'src/core/feature-flag/feature-flag.entity';
import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity'; import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
import { RecordPositionBackfillJob } from 'src/workspace/workspace-query-runner/jobs/record-position-backfill.job';
import { RecordPositionBackfillModule } from 'src/workspace/workspace-query-runner/services/record-position-backfill-module';
@Module({ @Module({
imports: [ imports: [
@ -56,6 +58,7 @@ import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
UserModule, UserModule,
UserWorkspaceModule, UserWorkspaceModule,
WorkspaceDataSourceModule, WorkspaceDataSourceModule,
RecordPositionBackfillModule,
], ],
providers: [ providers: [
{ {
@ -100,6 +103,10 @@ import { DataSourceEntity } from 'src/metadata/data-source/data-source.entity';
useClass: DeleteConnectedAccountAssociatedDataJob, useClass: DeleteConnectedAccountAssociatedDataJob,
}, },
{ provide: UpdateSubscriptionJob.name, useClass: UpdateSubscriptionJob }, { provide: UpdateSubscriptionJob.name, useClass: UpdateSubscriptionJob },
{
provide: RecordPositionBackfillJob.name,
useClass: RecordPositionBackfillJob,
},
], ],
}) })
export class JobsModule { export class JobsModule {

View File

@ -7,4 +7,5 @@ export enum MessageQueue {
cronQueue = 'cron-queue', cronQueue = 'cron-queue',
emailQueue = 'email-queue', emailQueue = 'email-queue',
billingQueue = 'billing-queue', billingQueue = 'billing-queue',
recordPositionBackfillQueue = 'record-position-backfill-queue',
} }

View File

@ -1,12 +1,13 @@
import { ObjectMetadataInterface } from 'src/metadata/field-metadata/interfaces/object-metadata.interface'; import {
RecordPositionQueryFactory,
import { RecordPositionQueryFactory } from 'src/workspace/workspace-query-builder/factories/record-position-query.factory'; RecordPositionQueryType,
} from 'src/workspace/workspace-query-builder/factories/record-position-query.factory';
describe('RecordPositionQueryFactory', () => { describe('RecordPositionQueryFactory', () => {
const objectMetadataItem = { const objectMetadataItem = {
isCustom: false, isCustom: false,
nameSingular: 'company', nameSingular: 'company',
} as ObjectMetadataInterface; };
const dataSourceSchema = 'workspace_test'; const dataSourceSchema = 'workspace_test';
const factory: RecordPositionQueryFactory = new RecordPositionQueryFactory(); const factory: RecordPositionQueryFactory = new RecordPositionQueryFactory();
@ -19,6 +20,7 @@ describe('RecordPositionQueryFactory', () => {
const positionValue = 'first'; const positionValue = 'first';
const result = await factory.create( const result = await factory.create(
RecordPositionQueryType.GET,
positionValue, positionValue,
objectMetadataItem, objectMetadataItem,
dataSourceSchema, dataSourceSchema,
@ -34,6 +36,7 @@ describe('RecordPositionQueryFactory', () => {
const positionValue = 'last'; const positionValue = 'last';
const result = await factory.create( const result = await factory.create(
RecordPositionQueryType.GET,
positionValue, positionValue,
objectMetadataItem, objectMetadataItem,
dataSourceSchema, dataSourceSchema,

View File

@ -1,21 +1,74 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { ObjectMetadataInterface } from 'src/metadata/field-metadata/interfaces/object-metadata.interface'; export enum RecordPositionQueryType {
GET = 'GET',
UPDATE = 'UPDATE',
}
@Injectable() @Injectable()
export class RecordPositionQueryFactory { export class RecordPositionQueryFactory {
async create( async create(
recordPositionQueryType: RecordPositionQueryType,
positionValue: 'first' | 'last' | number,
objectMetadata: { isCustom: boolean; nameSingular: string },
dataSourceSchema: string,
recordId?: string,
): Promise<string> {
const name =
(objectMetadata.isCustom ? '_' : '') + objectMetadata.nameSingular;
switch (recordPositionQueryType) {
case RecordPositionQueryType.GET:
if (typeof positionValue === 'number') {
throw new Error(
'RecordPositionQueryType.GET requires positionValue to be a number',
);
}
return this.createForGet(positionValue, name, dataSourceSchema);
case RecordPositionQueryType.UPDATE:
if (typeof positionValue !== 'number') {
throw new Error(
'RecordPositionQueryType.UPDATE requires positionValue to be a number',
);
}
if (!recordId) {
throw new Error(
'RecordPositionQueryType.UPDATE requires recordId to be defined',
);
}
return this.createForUpdate(
positionValue,
name,
dataSourceSchema,
recordId,
);
default:
throw new Error('Invalid RecordPositionQueryType');
}
}
private async createForGet(
positionValue: 'first' | 'last', positionValue: 'first' | 'last',
objectMetadataItem: ObjectMetadataInterface, name: string,
dataSourceSchema: string, dataSourceSchema: string,
): Promise<string> { ): Promise<string> {
const orderByDirection = positionValue === 'first' ? 'ASC' : 'DESC'; const orderByDirection = positionValue === 'first' ? 'ASC' : 'DESC';
const name =
(objectMetadataItem.isCustom ? '_' : '') +
objectMetadataItem.nameSingular;
return `SELECT position FROM ${dataSourceSchema}."${name}" return `SELECT position FROM ${dataSourceSchema}."${name}"
WHERE "position" IS NOT NULL ORDER BY "position" ${orderByDirection} LIMIT 1`; WHERE "position" IS NOT NULL ORDER BY "position" ${orderByDirection} LIMIT 1`;
} }
private async createForUpdate(
positionValue: number,
name: string,
dataSourceSchema: string,
recordId: string,
): Promise<string> {
return `UPDATE ${dataSourceSchema}."${name}"
SET "position" = ${positionValue}
WHERE "id" = '${recordId}'`;
}
} }

View File

@ -3,43 +3,33 @@ import { Test, TestingModule } from '@nestjs/testing';
import { WorkspaceQueryRunnerOptions } from 'src/workspace/workspace-query-runner/interfaces/query-runner-option.interface'; import { WorkspaceQueryRunnerOptions } from 'src/workspace/workspace-query-runner/interfaces/query-runner-option.interface';
import { FieldMetadataInterface } from 'src/metadata/field-metadata/interfaces/field-metadata.interface'; import { FieldMetadataInterface } from 'src/metadata/field-metadata/interfaces/field-metadata.interface';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { RecordPositionQueryFactory } from 'src/workspace/workspace-query-builder/factories/record-position-query.factory';
import { QueryRunnerArgsFactory } from 'src/workspace/workspace-query-runner/factories/query-runner-args.factory'; import { QueryRunnerArgsFactory } from 'src/workspace/workspace-query-runner/factories/query-runner-args.factory';
import { FieldMetadataType } from 'src/metadata/field-metadata/field-metadata.entity'; import { FieldMetadataType } from 'src/metadata/field-metadata/field-metadata.entity';
import { RecordPositionFactory } from 'src/workspace/workspace-query-runner/factories/record-position.factory';
describe('QueryRunnerArgsFactory', () => { describe('QueryRunnerArgsFactory', () => {
const workspaceDataSourceService = { const recordPositionFactory = {
getSchemaName: jest.fn().mockResolvedValue('test schema'), create: jest.fn().mockResolvedValue(2),
executeRawQuery: jest.fn(),
};
const recordPositionQueryFactory = {
create: jest.fn().mockResolvedValue('test query'),
}; };
const options = { const options = {
fieldMetadataCollection: [ fieldMetadataCollection: [
{ name: 'position', type: FieldMetadataType.POSITION }, { name: 'position', type: FieldMetadataType.POSITION },
] as FieldMetadataInterface[], ] as FieldMetadataInterface[],
objectMetadataItem: { isCustom: true, nameSingular: 'test' },
} as WorkspaceQueryRunnerOptions; } as WorkspaceQueryRunnerOptions;
let factory: QueryRunnerArgsFactory; let factory: QueryRunnerArgsFactory;
beforeEach(async () => { beforeEach(async () => {
jest.resetAllMocks();
const module: TestingModule = await Test.createTestingModule({ const module: TestingModule = await Test.createTestingModule({
providers: [ providers: [
QueryRunnerArgsFactory, QueryRunnerArgsFactory,
{ {
provide: RecordPositionQueryFactory, provide: RecordPositionFactory,
useValue: { useValue: {
create: recordPositionQueryFactory.create, create: recordPositionFactory.create,
}, },
}, },
{
provide: WorkspaceDataSourceService,
useValue: workspaceDataSourceService,
},
], ],
}).compile(); }).compile();
@ -63,17 +53,10 @@ describe('QueryRunnerArgsFactory', () => {
it('should override args when of type array', async () => { it('should override args when of type array', async () => {
const args = { data: [{ id: 1 }, { position: 'last' }] }; const args = { data: [{ id: 1 }, { position: 'last' }] };
workspaceDataSourceService.executeRawQuery.mockResolvedValue([
{ position: 1 },
]);
const result = await factory.create(args, options); const result = await factory.create(args, options);
expect(result).toEqual({ expect(result).toEqual({
data: [ data: [{ id: 1 }, { position: 2 }],
{ id: 1 },
{ position: 2 }, // Calculates 1 + 1
],
}); });
}); });
}); });

View File

@ -1,3 +1,7 @@
import { QueryRunnerArgsFactory } from 'src/workspace/workspace-query-runner/factories/query-runner-args.factory'; import { RecordPositionFactory } from './record-position.factory';
import { QueryRunnerArgsFactory } from './query-runner-args.factory';
export const workspaceQueryRunnerFactories = [QueryRunnerArgsFactory]; export const workspaceQueryRunnerFactories = [
QueryRunnerArgsFactory,
RecordPositionFactory,
];

View File

@ -1,19 +1,15 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { WorkspaceQueryRunnerOptions } from 'src/workspace/workspace-query-runner/interfaces/query-runner-option.interface';
import { FieldMetadataInterface } from 'src/metadata/field-metadata/interfaces/field-metadata.interface'; import { FieldMetadataInterface } from 'src/metadata/field-metadata/interfaces/field-metadata.interface';
import { ObjectMetadataInterface } from 'src/metadata/field-metadata/interfaces/object-metadata.interface'; import { WorkspaceQueryRunnerOptions } from 'src/workspace/workspace-query-runner/interfaces/query-runner-option.interface';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import { RecordPositionQueryFactory } from 'src/workspace/workspace-query-builder/factories/record-position-query.factory';
import { FieldMetadataType } from 'src/metadata/field-metadata/field-metadata.entity'; import { FieldMetadataType } from 'src/metadata/field-metadata/field-metadata.entity';
import { RecordPositionFactory } from './record-position.factory';
@Injectable() @Injectable()
export class QueryRunnerArgsFactory { export class QueryRunnerArgsFactory {
constructor( constructor(private readonly recordPositionFactory: RecordPositionFactory) {}
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly recordPositionQueryFactory: RecordPositionQueryFactory,
) {}
async create( async create(
args: Record<string, any>, args: Record<string, any>,
@ -54,9 +50,12 @@ export class QueryRunnerArgsFactory {
case FieldMetadataType.POSITION: case FieldMetadataType.POSITION:
return [ return [
key, key,
await this.buildPositionValue( await this.recordPositionFactory.create(
value, value,
options.objectMetadataItem, {
isCustom: options.objectMetadataItem.isCustom,
nameSingular: options.objectMetadataItem.nameSingular,
},
options.workspaceId, options.workspaceId,
), ),
]; ];
@ -70,36 +69,4 @@ export class QueryRunnerArgsFactory {
return Object.fromEntries(newArgEntries); return Object.fromEntries(newArgEntries);
} }
private async buildPositionValue(
value: number | 'first' | 'last',
objectMetadataItem: ObjectMetadataInterface,
workspaceId: string,
) {
if (typeof value === 'number') {
return value;
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const query = await this.recordPositionQueryFactory.create(
value,
objectMetadataItem,
dataSourceSchema,
);
const records = await this.workspaceDataSourceService.executeRawQuery(
query,
[],
workspaceId,
undefined,
);
return (
(value === 'first'
? records[0]?.position / 2
: records[0]?.position + 1) || 1
);
}
} }

View File

@ -0,0 +1,48 @@
import { Injectable } from '@nestjs/common';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import {
RecordPositionQueryFactory,
RecordPositionQueryType,
} from 'src/workspace/workspace-query-builder/factories/record-position-query.factory';
@Injectable()
export class RecordPositionFactory {
constructor(
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
private readonly recordPositionQueryFactory: RecordPositionQueryFactory,
) {}
async create(
value: number | 'first' | 'last',
objectMetadata: { isCustom: boolean; nameSingular: string },
workspaceId: string,
): Promise<number> {
if (typeof value === 'number') {
return value;
}
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const query = await this.recordPositionQueryFactory.create(
RecordPositionQueryType.GET,
value,
objectMetadata,
dataSourceSchema,
);
const records = await this.workspaceDataSourceService.executeRawQuery(
query,
[],
workspaceId,
undefined,
);
return (
(value === 'first'
? records[0]?.position / 2
: records[0]?.position + 1) || 1
);
}
}

View File

@ -0,0 +1,28 @@
import { Injectable } from '@nestjs/common';
import { MessageQueueJob } from 'src/integrations/message-queue/interfaces/message-queue-job.interface';
import { RecordPositionBackfillService } from 'src/workspace/workspace-query-runner/services/record-position-backfill-service';
export type RecordPositionBackfillJobData = {
workspaceId: string;
objectMetadata: { nameSingular: string; isCustom: boolean };
recordId: string;
};
@Injectable()
export class RecordPositionBackfillJob
implements MessageQueueJob<RecordPositionBackfillJobData>
{
constructor(
private readonly recordPositionBackfillService: RecordPositionBackfillService,
) {}
async handle(data: RecordPositionBackfillJobData): Promise<void> {
this.recordPositionBackfillService.backfill(
data.workspaceId,
data.objectMetadata,
data.recordId,
);
}
}

View File

@ -0,0 +1,56 @@
import { Inject, Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import {
CreatedObjectMetadata,
ObjectRecordCreateEvent,
} from 'src/integrations/event-emitter/types/object-record-create.event';
import { MessageQueue } from 'src/integrations/message-queue/message-queue.constants';
import { MessageQueueService } from 'src/integrations/message-queue/services/message-queue.service';
import {
RecordPositionBackfillJob,
RecordPositionBackfillJobData,
} from 'src/workspace/workspace-query-runner/jobs/record-position-backfill.job';
@Injectable()
export class RecordPositionListener {
constructor(
@Inject(MessageQueue.recordPositionBackfillQueue)
private readonly messageQueueService: MessageQueueService,
) {}
@OnEvent('*.created')
async handleAllCreate(payload: ObjectRecordCreateEvent<any>) {
if (!hasPositionField(payload.createdObjectMetadata)) {
return;
}
if (hasPositionSet(payload.createdRecord)) {
return;
}
this.messageQueueService.add<RecordPositionBackfillJobData>(
RecordPositionBackfillJob.name,
{
workspaceId: payload.workspaceId,
recordId: payload.createdRecord.id,
objectMetadata: payload.createdObjectMetadata,
},
);
}
}
const hasPositionField = (
createdObjectMetadata: CreatedObjectMetadata,
): boolean => {
return (
createdObjectMetadata.isCustom ||
['opportunity', 'company', 'people'].includes(
createdObjectMetadata.nameSingular,
)
);
};
const hasPositionSet = (createdRecord: any): boolean => {
return !!createdRecord?.position;
};

View File

@ -0,0 +1,17 @@
import { Module } from '@nestjs/common';
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
import { RecordPositionQueryFactory } from 'src/workspace/workspace-query-builder/factories/record-position-query.factory';
import { RecordPositionFactory } from 'src/workspace/workspace-query-runner/factories/record-position.factory';
import { RecordPositionBackfillService } from 'src/workspace/workspace-query-runner/services/record-position-backfill-service';
@Module({
imports: [WorkspaceDataSourceModule],
providers: [
RecordPositionFactory,
RecordPositionQueryFactory,
RecordPositionBackfillService,
],
exports: [RecordPositionBackfillService],
})
export class RecordPositionBackfillModule {}

View File

@ -0,0 +1,49 @@
import { Injectable } from '@nestjs/common';
import { ObjectMetadataInterface } from 'src/metadata/field-metadata/interfaces/object-metadata.interface';
import { WorkspaceDataSourceService } from 'src/workspace/workspace-datasource/workspace-datasource.service';
import {
RecordPositionQueryFactory,
RecordPositionQueryType,
} from 'src/workspace/workspace-query-builder/factories/record-position-query.factory';
import { RecordPositionFactory } from 'src/workspace/workspace-query-runner/factories/record-position.factory';
@Injectable()
export class RecordPositionBackfillService {
constructor(
private readonly recordPositionFactory: RecordPositionFactory,
private readonly recordPositionQueryFactory: RecordPositionQueryFactory,
private readonly workspaceDataSourceService: WorkspaceDataSourceService,
) {}
async backfill(
workspaceId: string,
objectMetadata: { nameSingular: string; isCustom: boolean },
recordId: string,
) {
const position = await this.recordPositionFactory.create(
'last',
objectMetadata as ObjectMetadataInterface,
workspaceId,
);
const dataSourceSchema =
this.workspaceDataSourceService.getSchemaName(workspaceId);
const query = await this.recordPositionQueryFactory.create(
RecordPositionQueryType.UPDATE,
position,
objectMetadata as ObjectMetadataInterface,
dataSourceSchema,
recordId,
);
this.workspaceDataSourceService.executeRawQuery(
query,
[],
workspaceId,
undefined,
);
}
}

View File

@ -4,6 +4,7 @@ import { WorkspaceQueryBuilderModule } from 'src/workspace/workspace-query-build
import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module'; import { WorkspaceDataSourceModule } from 'src/workspace/workspace-datasource/workspace-datasource.module';
import { WorkspacePreQueryHookModule } from 'src/workspace/workspace-query-runner/workspace-pre-query-hook/workspace-pre-query-hook.module'; import { WorkspacePreQueryHookModule } from 'src/workspace/workspace-query-runner/workspace-pre-query-hook/workspace-pre-query-hook.module';
import { workspaceQueryRunnerFactories } from 'src/workspace/workspace-query-runner/factories'; import { workspaceQueryRunnerFactories } from 'src/workspace/workspace-query-runner/factories';
import { RecordPositionListener } from 'src/workspace/workspace-query-runner/listeners/record-position.listener';
import { WorkspaceQueryRunnerService } from './workspace-query-runner.service'; import { WorkspaceQueryRunnerService } from './workspace-query-runner.service';
@ -13,7 +14,11 @@ import { WorkspaceQueryRunnerService } from './workspace-query-runner.service';
WorkspaceDataSourceModule, WorkspaceDataSourceModule,
WorkspacePreQueryHookModule, WorkspacePreQueryHookModule,
], ],
providers: [WorkspaceQueryRunnerService, ...workspaceQueryRunnerFactories], providers: [
WorkspaceQueryRunnerService,
...workspaceQueryRunnerFactories,
RecordPositionListener,
],
exports: [WorkspaceQueryRunnerService], exports: [WorkspaceQueryRunnerService],
}) })
export class WorkspaceQueryRunnerModule {} export class WorkspaceQueryRunnerModule {}

View File

@ -242,7 +242,11 @@ export class WorkspaceQueryRunnerService {
parsedResults.forEach((record) => { parsedResults.forEach((record) => {
this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.created`, { this.eventEmitter.emit(`${objectMetadataItem.nameSingular}.created`, {
workspaceId, workspaceId,
createdRecord: [this.removeNestedProperties(record)], createdRecord: this.removeNestedProperties(record),
createdObjectMetadata: {
nameSingular: objectMetadataItem.nameSingular,
isCustom: objectMetadataItem.isCustom,
},
} satisfies ObjectRecordCreateEvent<any>); } satisfies ObjectRecordCreateEvent<any>);
}); });