feat(analytics): add clickhouse (#11174)
This commit is contained in:
@ -0,0 +1,10 @@
|
||||
CREATE TABLE IF NOT EXISTS events
|
||||
(
|
||||
`event` LowCardinality(String),
|
||||
`timestamp` DateTime64(3),
|
||||
`userId` String DEFAULT '',
|
||||
`workspaceId` String DEFAULT '',
|
||||
`properties` JSON
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (event, workspaceId, timestamp);
|
||||
@ -0,0 +1,10 @@
|
||||
CREATE TABLE IF NOT EXISTS pageview
|
||||
(
|
||||
`name` LowCardinality(String),
|
||||
`timestamp` DateTime64(3),
|
||||
`properties` JSON,
|
||||
`userId` String DEFAULT '',
|
||||
`workspaceId` String DEFAULT ''
|
||||
)
|
||||
ENGINE = MergeTree
|
||||
ORDER BY (name, workspaceId, userId, timestamp);
|
||||
@ -0,0 +1,107 @@
|
||||
/* eslint-disable no-console */
|
||||
import fs from 'fs';
|
||||
import path from 'path';
|
||||
|
||||
import { createClient, ClickHouseClient } from '@clickhouse/client';
|
||||
import { config } from 'dotenv';
|
||||
|
||||
config({
|
||||
path: process.env.NODE_ENV === 'test' ? '.env.test' : '.env',
|
||||
override: true,
|
||||
});
|
||||
|
||||
const clickhouseUrl = () => {
|
||||
const url = process.env.CLICKHOUSE_URL;
|
||||
|
||||
if (url) return url;
|
||||
|
||||
throw new Error(
|
||||
'CLICKHOUSE_URL environment variable is not set. Please set it to the ClickHouse URL.',
|
||||
);
|
||||
};
|
||||
|
||||
async function ensureDatabaseExists() {
|
||||
const [url, database] = clickhouseUrl().split(/\/(?=[^/]*$)/);
|
||||
const client = createClient({
|
||||
url,
|
||||
});
|
||||
|
||||
await client.command({
|
||||
query: `CREATE DATABASE IF NOT EXISTS "${database}"`,
|
||||
});
|
||||
await client.command({
|
||||
query: `SET enable_json_type = 1`,
|
||||
});
|
||||
|
||||
await client.close();
|
||||
}
|
||||
|
||||
async function ensureMigrationTable(client: ClickHouseClient) {
|
||||
await client.command({
|
||||
query: `
|
||||
CREATE TABLE IF NOT EXISTS migrations (
|
||||
filename String,
|
||||
applied_at DateTime DEFAULT now()
|
||||
) ENGINE = MergeTree()
|
||||
ORDER BY filename;
|
||||
`,
|
||||
});
|
||||
}
|
||||
|
||||
async function hasMigrationBeenRun(
|
||||
filename: string,
|
||||
client: ClickHouseClient,
|
||||
): Promise<boolean> {
|
||||
const resultSet = await client.query({
|
||||
query: `SELECT count() as count FROM migrations WHERE filename = {filename:String}`,
|
||||
query_params: { filename },
|
||||
format: 'JSON',
|
||||
});
|
||||
const result = await resultSet.json<{ count: number }>();
|
||||
|
||||
return result.data[0].count > 0;
|
||||
}
|
||||
|
||||
async function recordMigration(filename: string, client: ClickHouseClient) {
|
||||
await client.insert({
|
||||
table: 'migrations',
|
||||
values: [{ filename }],
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
}
|
||||
|
||||
async function runMigrations() {
|
||||
const dir = path.join(__dirname);
|
||||
const files = fs.readdirSync(dir).filter((f) => f.endsWith('.sql'));
|
||||
|
||||
await ensureDatabaseExists();
|
||||
|
||||
const client = createClient({
|
||||
url: clickhouseUrl(),
|
||||
});
|
||||
|
||||
await ensureMigrationTable(client);
|
||||
|
||||
for (const file of files) {
|
||||
const alreadyRun = await hasMigrationBeenRun(file, client);
|
||||
|
||||
if (alreadyRun) {
|
||||
console.log(`✔︎ Skipping already applied migration: ${file}`);
|
||||
continue;
|
||||
}
|
||||
|
||||
const sql = fs.readFileSync(path.join(dir, file), 'utf8');
|
||||
|
||||
console.log(`⚡ Running ${file}...`);
|
||||
await client.command({ query: sql });
|
||||
await recordMigration(file, client);
|
||||
}
|
||||
|
||||
console.log('✅ All migrations applied.');
|
||||
await client.close();
|
||||
}
|
||||
|
||||
runMigrations().catch((err) => {
|
||||
console.error('Migration error:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
@ -0,0 +1,38 @@
|
||||
/* eslint-disable no-console */
|
||||
import { createClient } from '@clickhouse/client';
|
||||
import { config } from 'dotenv';
|
||||
|
||||
import { fixtures } from 'src/engine/core-modules/analytics/utils/fixtures/fixtures';
|
||||
|
||||
config({
|
||||
path: process.env.NODE_ENV === 'test' ? '.env.test' : '.env',
|
||||
override: true,
|
||||
});
|
||||
|
||||
const client = createClient({
|
||||
url: process.env.CLICKHOUSE_URL,
|
||||
});
|
||||
|
||||
async function seedEvents() {
|
||||
try {
|
||||
console.log(`⚡ Seeding ${fixtures.length} events...`);
|
||||
|
||||
await client.insert({
|
||||
table: 'events',
|
||||
values: fixtures,
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
|
||||
console.log('✅ All events seeded successfully');
|
||||
} catch (error) {
|
||||
console.error('Error seeding events:', error);
|
||||
throw error;
|
||||
} finally {
|
||||
await client.close();
|
||||
}
|
||||
}
|
||||
|
||||
seedEvents().catch((err) => {
|
||||
console.error('Seeding error:', err);
|
||||
process.exit(1);
|
||||
});
|
||||
Reference in New Issue
Block a user