Add billing meter event service (#9865)
Solves : https://github.com/twentyhq/private-issues/issues/241 https://github.com/twentyhq/private-issues/issues/254 **TLDR:** - Add BillingMeterEventService and StripeBillingMeterEventService in order to send billing meter events to stripe. - Plugged the service into workflow node execution for testing purposes (more improvements on this area will be done in the next PR's) **In order to test:** - Have the environment variable IS_BILLING_ENABLED set to true and add the other required environment variables for Billing to work - Do a database reset (to ensure that the new feature flag is properly added and that the billing tables are created) - Run the command: npx nx run twenty-server:command billing:sync-plans-data (if you don't do that the products and prices will not be present in the database) - Run the server , the frontend, the worker, and the stripe listen command (stripe listen --forward-to http://localhost:3000/billing/webhooks) - Buy a subscription for the Acme workspace - Create a workflow and run it - After the run has been finished check in sprite the quantity of events in the CreditMeter, you should see that there is a new occurence with value one. **Take into consideration:** - I used an eventName that I have made a long time ago, so it hasn't a significant naming. I'm updating the meters and associated prices in stripe to use the correct meter with a more clearer eventName. - I put some error handling in the execution of the workflow nodes, this is still incomplete and needs some refinement, I would like the feedback of the workflows track for a more cleaner approach
This commit is contained in:
committed by
GitHub
parent
1b3181b14e
commit
0d6f4a32a7
@ -1,5 +1,10 @@
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { BILLING_EXECUTE_BILLED_FUNCTION } from 'src/engine/core-modules/billing/constants/billing-execute-billed-function.constant';
|
||||
import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names';
|
||||
import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type';
|
||||
import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory';
|
||||
import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter';
|
||||
import {
|
||||
WorkflowRunOutput,
|
||||
WorkflowRunStatus,
|
||||
@ -19,7 +24,11 @@ export type WorkflowExecutorOutput = {
|
||||
@Injectable()
|
||||
export class WorkflowExecutorWorkspaceService {
|
||||
private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name);
|
||||
constructor(private readonly workflowActionFactory: WorkflowActionFactory) {}
|
||||
constructor(
|
||||
private readonly workflowActionFactory: WorkflowActionFactory,
|
||||
private readonly workspaceEventEmitter: WorkspaceEventEmitter,
|
||||
private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory,
|
||||
) {}
|
||||
|
||||
async execute({
|
||||
currentStepIndex,
|
||||
@ -64,6 +73,10 @@ export class WorkflowExecutorWorkspaceService {
|
||||
result.error?.errorMessage ??
|
||||
(result.result ? undefined : 'Execution result error, no data or error');
|
||||
|
||||
if (!error) {
|
||||
this.sendUsageEvent();
|
||||
}
|
||||
|
||||
const updatedStepOutput = {
|
||||
id: step.id,
|
||||
name: step.name,
|
||||
@ -122,4 +135,20 @@ export class WorkflowExecutorWorkspaceService {
|
||||
|
||||
return { ...updatedOutput, status: WorkflowRunStatus.FAILED };
|
||||
}
|
||||
|
||||
async sendUsageEvent() {
|
||||
const workspaceId =
|
||||
this.scopedWorkspaceContextFactory.create().workspaceId ?? '';
|
||||
|
||||
this.workspaceEventEmitter.emitCustomBatchEvent<BillingUsageEvent>(
|
||||
BILLING_EXECUTE_BILLED_FUNCTION,
|
||||
[
|
||||
{
|
||||
eventName: BillingMeterEventName.WORKFLOW_NODE_RUN,
|
||||
value: 1,
|
||||
},
|
||||
],
|
||||
workspaceId,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { Module } from '@nestjs/common';
|
||||
|
||||
import { BillingModule } from 'src/engine/core-modules/billing/billing.module';
|
||||
import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module';
|
||||
import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module';
|
||||
import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module';
|
||||
@ -8,7 +9,12 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne
|
||||
import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service';
|
||||
|
||||
@Module({
|
||||
imports: [WorkflowCommonModule, WorkflowExecutorModule, ThrottlerModule],
|
||||
imports: [
|
||||
WorkflowCommonModule,
|
||||
WorkflowExecutorModule,
|
||||
ThrottlerModule,
|
||||
BillingModule,
|
||||
],
|
||||
providers: [
|
||||
WorkflowRunnerWorkspaceService,
|
||||
WorkflowRunWorkspaceService,
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { Injectable, Logger } from '@nestjs/common';
|
||||
|
||||
import { BillingUsageService } from 'src/engine/core-modules/billing/services/billing-usage.service';
|
||||
import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator';
|
||||
import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants';
|
||||
import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service';
|
||||
@ -12,10 +13,12 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne
|
||||
|
||||
@Injectable()
|
||||
export class WorkflowRunnerWorkspaceService {
|
||||
private readonly logger = new Logger(WorkflowRunnerWorkspaceService.name);
|
||||
constructor(
|
||||
private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService,
|
||||
@InjectMessageQueue(MessageQueue.workflowQueue)
|
||||
private readonly messageQueueService: MessageQueueService,
|
||||
private readonly billingUsageService: BillingUsageService,
|
||||
) {}
|
||||
|
||||
async run(
|
||||
@ -24,6 +27,14 @@ export class WorkflowRunnerWorkspaceService {
|
||||
payload: object,
|
||||
source: ActorMetadata,
|
||||
) {
|
||||
const canExecuteBilledFunction =
|
||||
await this.billingUsageService.canExecuteBilledFunction(workspaceId);
|
||||
|
||||
if (!canExecuteBilledFunction) {
|
||||
this.logger.log(
|
||||
'Cannot execute billed function, there is no subscription for this workspace',
|
||||
);
|
||||
}
|
||||
const workflowRunId =
|
||||
await this.workflowRunWorkspaceService.createWorkflowRun(
|
||||
workflowVersionId,
|
||||
|
||||
Reference in New Issue
Block a user