diff --git a/packages/twenty-server/src/engine/core-modules/billing/billing.exception.ts b/packages/twenty-server/src/engine/core-modules/billing/billing.exception.ts index e5d623085..8e8027109 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/billing.exception.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/billing.exception.ts @@ -15,4 +15,6 @@ export enum BillingExceptionCode { BILLING_PRODUCT_NOT_FOUND = 'BILLING_PRODUCT_NOT_FOUND', BILLING_PRICE_NOT_FOUND = 'BILLING_PRICE_NOT_FOUND', BILLING_SUBSCRIPTION_EVENT_WORKSPACE_NOT_FOUND = 'BILLING_SUBSCRIPTION_EVENT_WORKSPACE_NOT_FOUND', + BILLING_ACTIVE_SUBSCRIPTION_NOT_FOUND = 'BILLING_ACTIVE_SUBSCRIPTION_NOT_FOUND', + BILLING_METER_EVENT_FAILED = 'BILLING_METER_EVENT_FAILED', } diff --git a/packages/twenty-server/src/engine/core-modules/billing/billing.module.ts b/packages/twenty-server/src/engine/core-modules/billing/billing.module.ts index 3007a9383..bce60ac24 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/billing.module.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/billing.module.ts @@ -13,11 +13,13 @@ import { BillingProduct } from 'src/engine/core-modules/billing/entities/billing import { BillingSubscriptionItem } from 'src/engine/core-modules/billing/entities/billing-subscription-item.entity'; import { BillingSubscription } from 'src/engine/core-modules/billing/entities/billing-subscription.entity'; import { BillingRestApiExceptionFilter } from 'src/engine/core-modules/billing/filters/billing-api-exception.filter'; +import { BillingExecuteBilledFunctionListener } from 'src/engine/core-modules/billing/listeners/billing-execute-billed-function.listener'; import { BillingWorkspaceMemberListener } from 'src/engine/core-modules/billing/listeners/billing-workspace-member.listener'; import { BillingPlanService } from 'src/engine/core-modules/billing/services/billing-plan.service'; import { BillingPortalWorkspaceService } from 'src/engine/core-modules/billing/services/billing-portal.workspace-service'; import { BillingProductService } from 'src/engine/core-modules/billing/services/billing-product.service'; import { BillingSubscriptionService } from 'src/engine/core-modules/billing/services/billing-subscription.service'; +import { BillingUsageService } from 'src/engine/core-modules/billing/services/billing-usage.service'; import { BillingService } from 'src/engine/core-modules/billing/services/billing.service'; import { StripeModule } from 'src/engine/core-modules/billing/stripe/stripe.module'; import { BillingWebhookEntitlementService } from 'src/engine/core-modules/billing/webhooks/services/billing-webhook-entitlement.service'; @@ -63,17 +65,20 @@ import { Workspace } from 'src/engine/core-modules/workspace/workspace.entity'; BillingResolver, BillingPlanService, BillingWorkspaceMemberListener, + BillingExecuteBilledFunctionListener, BillingService, BillingWebhookProductService, BillingWebhookPriceService, BillingRestApiExceptionFilter, BillingSyncCustomerDataCommand, BillingSyncPlansDataCommand, + BillingUsageService, ], exports: [ BillingSubscriptionService, BillingPortalWorkspaceService, BillingService, + BillingUsageService, ], }) export class BillingModule {} diff --git a/packages/twenty-server/src/engine/core-modules/billing/constants/billing-execute-billed-function.constant.ts b/packages/twenty-server/src/engine/core-modules/billing/constants/billing-execute-billed-function.constant.ts new file mode 100644 index 000000000..cf6a04d2a --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/billing/constants/billing-execute-billed-function.constant.ts @@ -0,0 +1,2 @@ +export const BILLING_EXECUTE_BILLED_FUNCTION = + 'billing_execute_billed_function'; diff --git a/packages/twenty-server/src/engine/core-modules/billing/enums/billing-meter-event-names.ts b/packages/twenty-server/src/engine/core-modules/billing/enums/billing-meter-event-names.ts new file mode 100644 index 000000000..1bd781ab3 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/billing/enums/billing-meter-event-names.ts @@ -0,0 +1,5 @@ +export enum BillingMeterEventName { + WORKFLOW_NODE_RUN = 'creditexecutiontest1', +} +//this is a test event name (no conventions) would you want camel case?, snake case, or all caps? +//Something like workflowNodeRunBillingMeterEvent ? diff --git a/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-execute-billed-function.listener.ts b/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-execute-billed-function.listener.ts new file mode 100644 index 000000000..522692f9d --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/billing/listeners/billing-execute-billed-function.listener.ts @@ -0,0 +1,39 @@ +import { Injectable } from '@nestjs/common'; + +import { OnCustomBatchEvent } from 'src/engine/api/graphql/graphql-query-runner/decorators/on-custom-batch-event.decorator'; +import { BILLING_EXECUTE_BILLED_FUNCTION } from 'src/engine/core-modules/billing/constants/billing-execute-billed-function.constant'; +import { BillingUsageService } from 'src/engine/core-modules/billing/services/billing-usage.service'; +import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type'; +import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; +import { WorkspaceEventBatch } from 'src/engine/workspace-event-emitter/types/workspace-event.type'; + +@Injectable() +export class BillingExecuteBilledFunctionListener { + constructor( + private readonly billingUsageService: BillingUsageService, + private readonly environmentService: EnvironmentService, + ) {} + + @OnCustomBatchEvent(BILLING_EXECUTE_BILLED_FUNCTION) + async handleExecuteBilledFunctionEvent( + payload: WorkspaceEventBatch, + ) { + if (!this.environmentService.get('IS_BILLING_ENABLED')) { + return; + } + + const canExecuteBilledFunction = + await this.billingUsageService.canExecuteBilledFunction( + payload.workspaceId, + ); + + if (!canExecuteBilledFunction) { + return; + } + + await this.billingUsageService.billUsage({ + workspaceId: payload.workspaceId, + billingEvents: payload.events, + }); + } +} diff --git a/packages/twenty-server/src/engine/core-modules/billing/services/billing-plan.service.ts b/packages/twenty-server/src/engine/core-modules/billing/services/billing-plan.service.ts index cecf60a80..3fa1f9e21 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/services/billing-plan.service.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/services/billing-plan.service.ts @@ -125,12 +125,12 @@ export class BillingPlanService { } const { baseProduct, meteredProducts, otherLicensedProducts } = plan; const baseProductPrice = baseProduct.billingPrices.find( - (price) => price.interval === interval, + (price) => price.interval === interval && price.active, ); if (!baseProductPrice) { throw new BillingException( - 'Base product price not found for given interval', + 'Base product active price not found for given interval', BillingExceptionCode.BILLING_PRICE_NOT_FOUND, ); } diff --git a/packages/twenty-server/src/engine/core-modules/billing/services/billing-product.service.ts b/packages/twenty-server/src/engine/core-modules/billing/services/billing-product.service.ts index 0f68b9996..34ac70626 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/services/billing-product.service.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/services/billing-product.service.ts @@ -22,7 +22,9 @@ export class BillingProductService { billingProductsByPlan: BillingProduct[]; }): BillingPrice[] { const billingPrices = billingProductsByPlan.flatMap((product) => - product.billingPrices.filter((price) => price.interval === interval), + product.billingPrices.filter( + (price) => price.interval === interval && price.active, + ), ); return billingPrices; diff --git a/packages/twenty-server/src/engine/core-modules/billing/services/billing-usage.service.ts b/packages/twenty-server/src/engine/core-modules/billing/services/billing-usage.service.ts new file mode 100644 index 000000000..429191f39 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/billing/services/billing-usage.service.ts @@ -0,0 +1,90 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; + +import { Repository } from 'typeorm'; + +import { + BillingException, + BillingExceptionCode, +} from 'src/engine/core-modules/billing/billing.exception'; +import { BillingCustomer } from 'src/engine/core-modules/billing/entities/billing-customer.entity'; +import { BillingSubscriptionService } from 'src/engine/core-modules/billing/services/billing-subscription.service'; +import { StripeBillingMeterEventService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-meter-event.service'; +import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type'; +import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; +import { FeatureFlagKey } from 'src/engine/core-modules/feature-flag/enums/feature-flag-key.enum'; +import { FeatureFlagService } from 'src/engine/core-modules/feature-flag/services/feature-flag.service'; + +@Injectable() +export class BillingUsageService { + protected readonly logger = new Logger(BillingUsageService.name); + constructor( + @InjectRepository(BillingCustomer, 'core') + private readonly billingCustomerRepository: Repository, + private readonly featureFlagService: FeatureFlagService, + private readonly billingSubscriptionService: BillingSubscriptionService, + private readonly environmentService: EnvironmentService, + private readonly stripeBillingMeterEventService: StripeBillingMeterEventService, + ) {} + + async canExecuteBilledFunction(workspaceId: string): Promise { + const isBillingEnabled = this.environmentService.get('IS_BILLING_ENABLED'); + const isBillingPlansEnabled = + await this.featureFlagService.isFeatureEnabled( + FeatureFlagKey.IsBillingPlansEnabled, + workspaceId, + ); + + if (!isBillingPlansEnabled || !isBillingEnabled) { + return true; + } + + const billingSubscription = + await this.billingSubscriptionService.getCurrentBillingSubscriptionOrThrow( + { + workspaceId, + }, + ); + + if (!billingSubscription) { + return false; + } + + return true; + } + + async billUsage({ + workspaceId, + billingEvents, + }: { + workspaceId: string; + billingEvents: BillingUsageEvent[]; + }) { + const workspaceStripeCustomer = + await this.billingCustomerRepository.findOne({ + where: { + workspaceId, + }, + }); + + if (!workspaceStripeCustomer) { + throw new BillingException( + 'Stripe customer not found', + BillingExceptionCode.BILLING_CUSTOMER_NOT_FOUND, + ); + } + + try { + await this.stripeBillingMeterEventService.sendBillingMeterEvent({ + eventName: billingEvents[0].eventName, + value: billingEvents[0].value, + stripeCustomerId: workspaceStripeCustomer.stripeCustomerId, + }); + } catch (error) { + throw new BillingException( + 'Failed to send billing meter events to Cache Service', + BillingExceptionCode.BILLING_METER_EVENT_FAILED, + ); + } + } +} diff --git a/packages/twenty-server/src/engine/core-modules/billing/stripe/services/stripe-billing-meter-event.service.ts b/packages/twenty-server/src/engine/core-modules/billing/stripe/services/stripe-billing-meter-event.service.ts new file mode 100644 index 000000000..62065c5f2 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/billing/stripe/services/stripe-billing-meter-event.service.ts @@ -0,0 +1,43 @@ +import { Injectable, Logger } from '@nestjs/common'; + +import Stripe from 'stripe'; + +import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; +import { StripeSDKService } from 'src/engine/core-modules/billing/stripe/stripe-sdk/services/stripe-sdk.service'; +import { EnvironmentService } from 'src/engine/core-modules/environment/environment.service'; + +@Injectable() +export class StripeBillingMeterEventService { + protected readonly logger = new Logger(StripeBillingMeterEventService.name); + private readonly stripe: Stripe; + + constructor( + private readonly environmentService: EnvironmentService, + private readonly stripeSDKService: StripeSDKService, + ) { + if (!this.environmentService.get('IS_BILLING_ENABLED')) { + return; + } + this.stripe = this.stripeSDKService.getStripe( + this.environmentService.get('BILLING_STRIPE_API_KEY'), + ); + } + + async sendBillingMeterEvent({ + eventName, + value, + stripeCustomerId, + }: { + eventName: BillingMeterEventName; + value: number; + stripeCustomerId: string; + }) { + await this.stripe.billing.meterEvents.create({ + event_name: eventName, + payload: { + value: value.toString(), + stripe_customer_id: stripeCustomerId, + }, + }); + } +} diff --git a/packages/twenty-server/src/engine/core-modules/billing/stripe/stripe.module.ts b/packages/twenty-server/src/engine/core-modules/billing/stripe/stripe.module.ts index c7e2dfca4..cb1827880 100644 --- a/packages/twenty-server/src/engine/core-modules/billing/stripe/stripe.module.ts +++ b/packages/twenty-server/src/engine/core-modules/billing/stripe/stripe.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; +import { StripeBillingMeterEventService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-meter-event.service'; import { StripeBillingMeterService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-meter.service'; import { StripeBillingPortalService } from 'src/engine/core-modules/billing/stripe/services/stripe-billing-portal.service'; import { StripeCheckoutService } from 'src/engine/core-modules/billing/stripe/services/stripe-checkout.service'; @@ -24,6 +25,7 @@ import { DomainManagerModule } from 'src/engine/core-modules/domain-manager/doma StripeCustomerService, StripePriceService, StripeProductService, + StripeBillingMeterEventService, ], exports: [ StripeWebhookService, @@ -35,6 +37,7 @@ import { DomainManagerModule } from 'src/engine/core-modules/domain-manager/doma StripeSubscriptionItemService, StripeSubscriptionService, StripeProductService, + StripeBillingMeterEventService, ], }) export class StripeModule {} diff --git a/packages/twenty-server/src/engine/core-modules/billing/types/billing-usage-event.type.ts b/packages/twenty-server/src/engine/core-modules/billing/types/billing-usage-event.type.ts new file mode 100644 index 000000000..ec6fba717 --- /dev/null +++ b/packages/twenty-server/src/engine/core-modules/billing/types/billing-usage-event.type.ts @@ -0,0 +1,8 @@ +import { NonNegative } from 'type-fest'; + +import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; + +export type BillingUsageEvent = { + eventName: BillingMeterEventName; + value: NonNegative; +}; diff --git a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts index bc2655d7a..aaaf231ce 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-executor/workspace-services/workflow-executor.workspace-service.ts @@ -1,5 +1,10 @@ import { Injectable, Logger } from '@nestjs/common'; +import { BILLING_EXECUTE_BILLED_FUNCTION } from 'src/engine/core-modules/billing/constants/billing-execute-billed-function.constant'; +import { BillingMeterEventName } from 'src/engine/core-modules/billing/enums/billing-meter-event-names'; +import { BillingUsageEvent } from 'src/engine/core-modules/billing/types/billing-usage-event.type'; +import { ScopedWorkspaceContextFactory } from 'src/engine/twenty-orm/factories/scoped-workspace-context.factory'; +import { WorkspaceEventEmitter } from 'src/engine/workspace-event-emitter/workspace-event-emitter'; import { WorkflowRunOutput, WorkflowRunStatus, @@ -19,7 +24,11 @@ export type WorkflowExecutorOutput = { @Injectable() export class WorkflowExecutorWorkspaceService { private readonly logger = new Logger(WorkflowExecutorWorkspaceService.name); - constructor(private readonly workflowActionFactory: WorkflowActionFactory) {} + constructor( + private readonly workflowActionFactory: WorkflowActionFactory, + private readonly workspaceEventEmitter: WorkspaceEventEmitter, + private readonly scopedWorkspaceContextFactory: ScopedWorkspaceContextFactory, + ) {} async execute({ currentStepIndex, @@ -64,6 +73,10 @@ export class WorkflowExecutorWorkspaceService { result.error?.errorMessage ?? (result.result ? undefined : 'Execution result error, no data or error'); + if (!error) { + this.sendUsageEvent(); + } + const updatedStepOutput = { id: step.id, name: step.name, @@ -122,4 +135,20 @@ export class WorkflowExecutorWorkspaceService { return { ...updatedOutput, status: WorkflowRunStatus.FAILED }; } + + async sendUsageEvent() { + const workspaceId = + this.scopedWorkspaceContextFactory.create().workspaceId ?? ''; + + this.workspaceEventEmitter.emitCustomBatchEvent( + BILLING_EXECUTE_BILLED_FUNCTION, + [ + { + eventName: BillingMeterEventName.WORKFLOW_NODE_RUN, + value: 1, + }, + ], + workspaceId, + ); + } } diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts index 19eda68e5..5776d893c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workflow-runner.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; +import { BillingModule } from 'src/engine/core-modules/billing/billing.module'; import { ThrottlerModule } from 'src/engine/core-modules/throttler/throttler.module'; import { WorkflowCommonModule } from 'src/modules/workflow/common/workflow-common.module'; import { WorkflowExecutorModule } from 'src/modules/workflow/workflow-executor/workflow-executor.module'; @@ -8,7 +9,12 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne import { WorkflowRunnerWorkspaceService } from 'src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service'; @Module({ - imports: [WorkflowCommonModule, WorkflowExecutorModule, ThrottlerModule], + imports: [ + WorkflowCommonModule, + WorkflowExecutorModule, + ThrottlerModule, + BillingModule, + ], providers: [ WorkflowRunnerWorkspaceService, WorkflowRunWorkspaceService, diff --git a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts index e0e0020b1..8db64b38c 100644 --- a/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts +++ b/packages/twenty-server/src/modules/workflow/workflow-runner/workspace-services/workflow-runner.workspace-service.ts @@ -1,5 +1,6 @@ -import { Injectable } from '@nestjs/common'; +import { Injectable, Logger } from '@nestjs/common'; +import { BillingUsageService } from 'src/engine/core-modules/billing/services/billing-usage.service'; import { InjectMessageQueue } from 'src/engine/core-modules/message-queue/decorators/message-queue.decorator'; import { MessageQueue } from 'src/engine/core-modules/message-queue/message-queue.constants'; import { MessageQueueService } from 'src/engine/core-modules/message-queue/services/message-queue.service'; @@ -12,10 +13,12 @@ import { WorkflowRunWorkspaceService } from 'src/modules/workflow/workflow-runne @Injectable() export class WorkflowRunnerWorkspaceService { + private readonly logger = new Logger(WorkflowRunnerWorkspaceService.name); constructor( private readonly workflowRunWorkspaceService: WorkflowRunWorkspaceService, @InjectMessageQueue(MessageQueue.workflowQueue) private readonly messageQueueService: MessageQueueService, + private readonly billingUsageService: BillingUsageService, ) {} async run( @@ -24,6 +27,14 @@ export class WorkflowRunnerWorkspaceService { payload: object, source: ActorMetadata, ) { + const canExecuteBilledFunction = + await this.billingUsageService.canExecuteBilledFunction(workspaceId); + + if (!canExecuteBilledFunction) { + this.logger.log( + 'Cannot execute billed function, there is no subscription for this workspace', + ); + } const workflowRunId = await this.workflowRunWorkspaceService.createWorkflowRun( workflowVersionId,