implement cohorts to clickhouse/pg library and all relevant queries

This commit is contained in:
Francis Cao 2025-07-03 12:06:49 -07:00
parent a753809a74
commit e75d009df3
22 changed files with 220 additions and 10983 deletions

View file

@ -89,6 +89,21 @@ function mapFilter(column: string, operator: string, name: string, type: string
}
}
function mapCohortFilter(column: string, operator: string, value: string) {
switch (operator) {
case OPERATORS.equals:
return `${column} = '${value}'`;
case OPERATORS.notEquals:
return `${column} != '${value}'`;
case OPERATORS.contains:
return `positionCaseInsensitive(${column}, '${value}') > 0`;
case OPERATORS.doesNotContain:
return `positionCaseInsensitive(${column}, '${value}') = 0`;
default:
return '';
}
}
function getFilterQuery(filters: QueryFilters = {}, options: QueryOptions = {}) {
const query = filtersToArray(filters, options).reduce((arr, { name, column, operator }) => {
if (column) {
@ -105,20 +120,40 @@ function getFilterQuery(filters: QueryFilters = {}, options: QueryOptions = {})
return query.join('\n');
}
function getCohortQuery(filters: QueryFilters = {}, options: QueryOptions = {}) {
const query = filtersToArray(filters, options).reduce((arr, { name, column, operator }) => {
if (column) {
arr.push(`and ${mapFilter(column, operator, name)}`);
function getCohortQuery(websiteId: string, filters: QueryFilters = {}, options: QueryOptions = {}) {
const query = filtersToArray(filters, options).reduce(
(arr, { name, column, operator, value }) => {
if (column) {
arr.push(
`${arr.length === 0 ? 'where' : 'and'} ${mapCohortFilter(column, operator, value)}`,
);
if (name === 'referrer') {
arr.push(`and referrer_domain != hostname`);
if (name === 'referrer') {
arr.push(`and referrer_domain != hostname`);
}
}
}
return arr;
}, []);
return arr;
},
[],
);
return query.join('\n');
if (query.length > 0) {
// add website and date range filters
query.push(`and website_id = '${websiteId}'`);
query.push(
`and created_at between parseDateTimeBestEffort('${filters.startDate}') and parseDateTimeBestEffort('${filters.endDate}')`,
);
return `join
(select distinct session_id
from website_event
${query.join('\n')}) cohort
on cohort.session_id = website_event.session_id
`;
}
return '';
}
function getDateQuery(filters: QueryFilters = {}) {
@ -162,7 +197,7 @@ async function parseFilters(websiteId: string, filters: QueryFilters = {}, optio
websiteId,
startDate: maxDate(filters.startDate, new Date(website?.resetAt)),
},
cohortQuery: getCohortQuery(filters),
cohortQuery: getCohortQuery(websiteId, filters?.cohort),
};
}