ClickHouse概述
ClickHouse是一个面向列的OLAP(在线分析处理)数据库管理系统,由Yandex开源。它以高性能、低延迟、高压缩比著称,是实时数据分析领域的明星产品。
ClickHouse的核心特性
- 高性能:单表查询可达数十亿行/秒
- 列式存储:适合OLAP场景
- 高压缩比:数据压缩率可达10:1
- 实时分析:支持实时数据写入和查询
- SQL支持:兼容标准SQL语法
- 分布式:支持水平扩展
- 容错:支持副本和分片
ClickHouse的应用场景
- 用户行为分析
- 日志分析
- 监控数据分析
- 商业智能(BI)
- 实时报表
- 广告投放分析
- 金融风控
核心概念
数据库引擎
ClickHouse支持多种数据库引擎:
| 引擎 | 说明 |
|---|---|
| Ordinary | 默认引擎,支持所有表引擎 |
| Lazy | 延迟加载,内存不足时卸载数据 |
| MySQL | 映射到MySQL数据库 |
| PostgreSQL | 映射到PostgreSQL数据库 |
| MaterializedMySQL | 实时同步MySQL数据 |
-- 创建数据库
CREATE DATABASE my_database;
-- 使用Lazy引擎
CREATE DATABASE my_lazy_db ENGINE = Lazy;
-- 映射MySQL数据库
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'db', 'user', 'password');
表引擎
表引擎是ClickHouse最重要的概念,决定了数据的存储方式和查询性能。
MergeTree系列
| 引擎 | 说明 |
|---|---|
| MergeTree | 基础引擎,支持索引和分区 |
| ReplacingMergeTree | 去重引擎 |
| SummingMergeTree | 聚合引擎 |
| AggregatingMergeTree | 高级聚合引擎 |
| CollapsingMergeTree | 折叠引擎 |
| VersionedCollapsingMergeTree | 版本折叠引擎 |
| GraphiteMergeTree | Graphite数据引擎 |
-- 创建MergeTree表
CREATE TABLE my_table (
id UInt32,
date Date,
name String,
value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, name)
SETTINGS index_granularity = 8192;
-- 创建ReplacingMergeTree表(去重)
CREATE TABLE dedup_table (
id UInt32,
name String,
version UInt32
) ENGINE = ReplacingMergeTree(version)
ORDER BY id;
-- 创建SummingMergeTree表(聚合)
CREATE TABLE sum_table (
date Date,
key String,
value UInt32
) ENGINE = SummingMergeTree(value)
ORDER BY (date, key);
Log系列
| 引擎 | 说明 |
|---|---|
| TinyLog | 最简单的引擎,不支持索引 |
| StripeLog | 支持索引,性能优于TinyLog |
| Log | 支持索引,读写分离 |
-- 创建TinyLog表
CREATE TABLE tiny_log_table (
id UInt32,
name String
) ENGINE = TinyLog;
Integration系列
| 引擎 | 说明 |
|---|---|
| HDFS | 读取HDFS文件 |
| S3 | 读取S3对象存储 |
| MySQL | 读取MySQL表 |
| PostgreSQL | 读取PostgreSQL表 |
| Kafka | 读取Kafka消息 |
-- 创建HDFS表
CREATE TABLE hdfs_table (
id UInt32,
name String
) ENGINE = HDFS('hdfs://localhost:9000/data/*.csv', 'CSV');
-- 创建Kafka表
CREATE TABLE kafka_table (
id UInt32,
name String
) ENGINE = Kafka('localhost:9092', 'topic', 'group', 'CSV');
数据类型
基本类型
| 类型 | 说明 | 示例 |
|---|---|---|
| UInt8 | 无符号8位整数 | 0-255 |
| UInt16 | 无符号16位整数 | 0-65535 |
| UInt32 | 无符号32位整数 | 0-4294967295 |
| UInt64 | 无符号64位整数 | 0-18446744073709551615 |
| Int8 | 有符号8位整数 | -128-127 |
| Int16 | 有符号16位整数 | -32768-32767 |
| Int32 | 有符号32位整数 | -2147483648-2147483647 |
| Int64 | 有符号64位整数 | -9223372036854775808-9223372036854775807 |
| Float32 | 单精度浮点数 | 3.14 |
| Float64 | 双精度浮点数 | 3.1415926535 |
| String | 字符串 | "hello" |
| FixedString(N) | 固定长度字符串 | "hello" |
| Date | 日期 | 2023-01-01 |
| DateTime | 日期时间 | 2023-01-01 12:00:00 |
复杂类型
| 类型 | 说明 |
|---|---|
| Array | 数组 |
| Map | 映射 |
| Tuple | 元组 |
| Nested | 嵌套结构 |
| Enum | 枚举 |
-- 创建包含复杂类型的表
CREATE TABLE complex_table (
id UInt32,
tags Array(String),
metadata Map(String, String),
point Tuple(Float32, Float32),
user Nested(
name String,
age UInt32
)
) ENGINE = MergeTree()
ORDER BY id;
-- 插入数据
INSERT INTO complex_table VALUES (
1,
['tag1', 'tag2', 'tag3'],
map('key1', 'value1', 'key2', 'value2'),
(1.0, 2.0),
('John', 25)
);
-- 查询数据
SELECT
id,
tags[1] as first_tag,
metadata['key1'] as value,
point.1 as x,
point.2 as y,
user.name,
user.age
FROM complex_table;
索引
ClickHouse使用稀疏索引,索引粒度可配置。
-- 创建表时设置索引粒度
CREATE TABLE indexed_table (
id UInt32,
date Date,
name String,
value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, name)
SETTINGS index_granularity = 8192;
-- 创建二级索引
CREATE TABLE table_with_secondary_index (
id UInt32,
date Date,
name String,
value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, name)
SETTINGS
index_granularity = 8192,
secondary_index_name = value_index,
secondary_index_type = minmax,
secondary_index_granularity = 8192;
数据操作
创建表
-- 创建MergeTree表
CREATE TABLE events (
event_id UInt64,
event_time DateTime,
user_id UInt32,
event_type String,
properties String,
value Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
SETTINGS index_granularity = 8192;
-- 创建分布式表
CREATE TABLE distributed_events AS events
ENGINE = Distributed(cluster_name, default, events, rand());
插入数据
-- 单条插入
INSERT INTO events VALUES (
1,
'2023-01-01 12:00:00',
1001,
'click',
'{"page": "home"}',
10.5
);
-- 批量插入
INSERT INTO events VALUES
(1, '2023-01-01 12:00:00', 1001, 'click', '{"page": "home"}', 10.5),
(2, '2023-01-01 12:00:01', 1002, 'view', '{"page": "product"}', 5.0),
(3, '2023-01-01 12:00:02', 1001, 'purchase', '{"product": "item1"}', 100.0);
-- 从文件导入
INSERT INTO events FROM INFILE '/path/to/data.csv' FORMAT CSV;
-- 从查询插入
INSERT INTO events
SELECT
event_id,
event_time,
user_id,
event_type,
properties,
value
FROM source_events;
查询数据
-- 基本查询
SELECT * FROM events LIMIT 10;
-- 条件查询
SELECT * FROM events WHERE event_time >= '2023-01-01' AND event_time < '2023-02-01';
-- 聚合查询
SELECT
event_type,
COUNT(*) as count,
SUM(value) as total_value,
AVG(value) as avg_value
FROM events
GROUP BY event_type;
-- 时间窗口查询
SELECT
toStartOfHour(event_time) as hour,
COUNT(*) as count
FROM events
GROUP BY hour
ORDER BY hour;
-- 窗口函数
SELECT
event_time,
user_id,
value,
SUM(value) OVER (PARTITION BY user_id ORDER BY event_time) as running_total
FROM events;
更新和删除
-- 删除数据
ALTER TABLE events DELETE WHERE event_time < '2023-01-01';
-- 更新数据
ALTER TABLE events UPDATE value = value * 1.1 WHERE event_type = 'purchase';
-- 分区删除
ALTER TABLE events DROP PARTITION '202301';
-- 分区附加
ALTER TABLE events ATTACH PARTITION '202301';
高级功能
物化视图
物化视图自动维护查询结果,提高查询性能。
-- 创建物化视图
CREATE MATERIALIZED VIEW events_mv
ENGINE = SummingMergeTree()
ORDER BY (event_date, event_type)
AS SELECT
toDate(event_time) as event_date,
event_type,
COUNT(*) as count,
SUM(value) as total_value
FROM events
GROUP BY event_date, event_type;
-- 查询物化视图
SELECT * FROM events_mv WHERE event_date = '2023-01-01';
-- 删除物化视图
DROP TABLE events_mv;
字典
字典用于快速查找关联数据。
-- 创建字典
CREATE DICTIONARY user_dict (
user_id UInt32,
user_name String,
user_level UInt8
) PRIMARY KEY user_id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' PASSWORD '' DATABASE 'default' TABLE 'users'))
LIFETIME(300)
LAYOUT(HASHED());
-- 使用字典
SELECT
e.user_id,
dictGet('user_dict', 'user_name', e.user_id) as user_name,
dictGet('user_dict', 'user_level', e.user_id) as user_level
FROM events e;
外部表
外部表用于查询外部数据源。
-- 查询CSV文件
SELECT * FROM file('data.csv', 'CSV', 'id UInt32, name String');
-- 查询JSON文件
SELECT * FROM file('data.json', 'JSONEachRow', 'id UInt32, name String');
-- 查询URL
SELECT * FROM url('http://example.com/data.csv', 'CSV');
函数
聚合函数
-- 基本聚合
SELECT
COUNT(*) as count,
SUM(value) as sum,
AVG(value) as avg,
MIN(value) as min,
MAX(value) as max
FROM events;
-- 条件聚合
SELECT
countIf(event_type = 'click') as click_count,
sumIf(value, event_type = 'purchase') as purchase_sum
FROM events;
-- 顶级聚合
SELECT
quantile(0.5)(value) as median,
quantile(0.9)(value) as p90,
quantile(0.99)(value) as p99
FROM events;
数组函数
-- 数组操作
SELECT
arrayJoin([1, 2, 3]) as value,
arraySum([1, 2, 3]) as sum,
arrayAvg([1, 2, 3]) as avg,
arrayDistinct([1, 2, 2, 3]) as distinct,
arraySort([3, 1, 2]) as sorted;
时间函数
-- 时间操作
SELECT
now() as current_time,
toYYYYMM(now()) as year_month,
toStartOfDay(now()) as start_of_day,
toStartOfHour(now()) as start_of_hour,
dateDiff('day', '2023-01-01', now()) as days_diff,
addDays(now(), 7) as next_week;
字符串函数
-- 字符串操作
SELECT
splitByChar(' ', 'hello world') as words,
arrayJoin(splitByChar(' ', 'hello world')) as word,
length('hello') as len,
substring('hello', 1, 3) as sub,
lower('HELLO') as lower_str,
upper('hello') as upper_str,
replace('hello world', 'world', 'clickhouse') as replaced;
集群部署
集群架构
┌─────────────────────────────────────┐
│ ClickHouse Cluster │
│ ┌────────┐ ┌────────┐ ┌────────┐│
│ │ Node 1 │ │ Node 2 │ │ Node 3 ││
│ │Replica1│ │Replica2│ │Replica3││
│ └────────┘ └────────┘ └────────┘│
└─────────────────────────────────────┘
配置文件
<!-- config.xml -->
<clickhouse>
<listen_host>0.0.0.0</listen_host>
<http_port>8123</http_port>
<tcp_port>9000</tcp_port>
<remote_servers>
<cluster_name>
<shard>
<replica>
<host>node1</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node2</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>node3</host>
<port>9000</port>
</replica>
</shard>
</cluster_name>
</remote_servers>
<zookeeper>
<node>
<host>zookeeper1</host>
<port>2181</port>
</node>
<node>
<host>zookeeper2</host>
<port>2181</port>
</node>
<node>
<host>zookeeper3</host>
<port>2181</port>
</node>
</zookeeper>
</clickhouse>
副本表
-- 创建副本表
CREATE TABLE events_replica (
event_id UInt64,
event_time DateTime,
user_id UInt32,
event_type String,
properties String,
value Float64
) ENGINE = ReplicatedMergeTree(
'/clickhouse/tables/{shard}/events',
'{replica}'
)
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id);
分布式表
-- 创建分布式表
CREATE TABLE events_distributed (
event_id UInt64,
event_time DateTime,
user_id UInt32,
event_type String,
properties String,
value Float64
) ENGINE = Distributed(
cluster_name,
default,
events_replica,
rand()
);
性能优化
表设计优化
-- 使用合适的数据类型
CREATE TABLE optimized_table (
id UInt32, -- 使用UInt32而不是String
date Date, -- 使用Date而不是DateTime
status LowCardinality(String), -- 使用LowCardinality
value Float32 -- 使用Float32而不是Float64
) ENGINE = MergeTree()
ORDER BY id;
-- 合理设置分区
CREATE TABLE partitioned_table (
id UInt32,
date Date,
value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date) -- 按月分区
ORDER BY (date, id);
-- 合理设置排序键
CREATE TABLE sorted_table (
id UInt32,
date Date,
user_id UInt32,
value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, user_id, id); -- 查询条件放在前面
查询优化
-- 使用物化视图
CREATE MATERIALIZED VIEW events_mv
ENGINE = SummingMergeTree()
ORDER BY (event_date, event_type)
AS SELECT
toDate(event_time) as event_date,
event_type,
COUNT(*) as count
FROM events
GROUP BY event_date, event_type;
-- 使用预聚合
SELECT * FROM events_mv WHERE event_date = '2023-01-01';
-- 避免全表扫描
SELECT * FROM events WHERE event_date = '2023-01-01' AND user_id = 1001;
-- 使用LIMIT限制结果
SELECT * FROM events LIMIT 1000;
-- 使用PREWHERE
SELECT * FROM events PREWHERE event_date = '2023-01-01' WHERE value > 100;
配置优化
<!-- config.xml -->
<clickhouse>
<max_memory_usage>10000000000</max_memory_usage>
<max_concurrent_queries>100</max_concurrent_queries>
<max_threads>8</max_threads>
<merge_tree>
<max_suspicious_broken_parts>5</max_suspicious_broken_parts>
<parts_to_throw_insert>10000</parts_to_throw_insert>
<parts_to_delay_insert>15000</parts_to_delay_insert>
</merge_tree>
<compression>
<case>
<min_part_size>1000000000</min_part_size>
<min_part_size_ratio>0.01</min_part_size_ratio>
<method>zstd</method>
</case>
</compression>
</clickhouse>
系统优化
# 增加文件描述符限制
ulimit -n 65535
# 调整内核参数
echo "vm.max_map_count=262144" >> /etc/sysctl.conf
echo "vm.swappiness=1" >> /etc/sysctl.conf
sysctl -p
# 使用SSD存储
# 将数据目录挂载到SSD
监控与运维
系统表
-- 查看表信息
SELECT * FROM system.tables WHERE database = 'default';
-- 查看分区信息
SELECT
table,
partition,
rows,
bytes_on_disk
FROM system.parts
WHERE table = 'events';
-- 查看查询信息
SELECT
query,
duration,
read_rows,
read_bytes,
memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
ORDER BY duration DESC
LIMIT 10;
-- 查看进程信息
SELECT * FROM system.processes;
性能监控
-- 查看集群信息
SELECT * FROM system.clusters;
-- 查看副本信息
SELECT * FROM system.replicas;
-- 查看Zookeeper信息
SELECT * FROM system.zookeeper;
-- 查看磁盘使用情况
SELECT
name,
path,
free_space,
total_space,
keep_free_space
FROM system.disks;
维护操作
-- 优化表
OPTIMIZE TABLE events FINAL;
-- 检查表
CHECK TABLE events;
-- 删除旧分区
ALTER TABLE events DROP PARTITION '202301';
-- 修改表设置
ALTER TABLE events MODIFY SETTING max_bytes_to_merge_at_once = 1073741824;
实战案例
用户行为分析
-- 创建用户行为表
CREATE TABLE user_events (
event_id UInt64,
event_time DateTime,
user_id UInt32,
event_type String,
page String,
referrer String,
device String,
os String,
browser String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id);
-- 创建物化视图
CREATE MATERIALIZED VIEW user_events_daily_mv
ENGINE = SummingMergeTree()
ORDER BY (event_date, event_type)
AS SELECT
toDate(event_time) as event_date,
event_type,
COUNT(*) as count
FROM user_events
GROUP BY event_date, event_type;
-- 查询每日事件统计
SELECT
event_date,
event_type,
count
FROM user_events_daily_mv
WHERE event_date >= '2023-01-01'
ORDER BY event_date, event_type;
-- 查询用户留存
SELECT
toDate(event_time) as date,
uniqExact(user_id) as dau,
countIf(user_id IN (
SELECT user_id
FROM user_events
WHERE event_time >= date - INTERVAL 1 DAY
GROUP BY user_id
)) as retained_users
FROM user_events
GROUP BY date;
实时监控
-- 创建监控表
CREATE TABLE metrics (
timestamp DateTime,
metric_name String,
metric_value Float64,
tags Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, metric_name);
-- 创建物化视图
CREATE MATERIALIZED VIEW metrics_5min_mv
ENGINE = AggregatingMergeTree()
ORDER BY (window_start, metric_name)
AS SELECT
toStartOfFiveMinutes(timestamp) as window_start,
metric_name,
avgState(metric_value) as avg_value,
maxState(metric_value) as max_value,
minState(metric_value) as min_value,
countState(metric_value) as count
FROM metrics
GROUP BY window_start, metric_name;
-- 查询实时指标
SELECT
window_start,
metric_name,
avgMerge(avg_value) as avg_value,
maxMerge(max_value) as max_value,
minMerge(min_value) as min_value,
countMerge(count_value) as count
FROM metrics_5min_mv
WHERE window_start >= now() - INTERVAL 1 HOUR
ORDER BY window_start, metric_name;
A/B测试分析
-- 创建实验表
CREATE TABLE experiments (
experiment_id String,
user_id UInt32,
variant String,
timestamp DateTime,
metric_name String,
metric_value Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (experiment_id, timestamp, user_id);
-- 分析实验结果
SELECT
experiment_id,
variant,
COUNT(DISTINCT user_id) as user_count,
avg(metric_value) as avg_value,
quantile(0.5)(metric_value) as median,
quantile(0.9)(metric_value) as p90,
quantile(0.99)(metric_value) as p99
FROM experiments
WHERE metric_name = 'revenue'
GROUP BY experiment_id, variant;
-- 显著性检验
SELECT
experiment_id,
variant_a,
variant_b,
(avg_a - avg_b) / sqrt(var_a / count_a + var_b / count_b) as z_score,
1 - normalCDF(abs(z_score)) as p_value
FROM (
SELECT
experiment_id,
groupArray(variant) as variants,
groupArray(avg_value) as avg_values,
groupArray(varPop(metric_value)) as var_values,
groupArray(count(*)) as counts
FROM experiments
WHERE metric_name = 'revenue'
GROUP BY experiment_id
) ARRAY JOIN
variants[1] as variant_a,
avg_values[1] as avg_a,
var_values[1] as var_a,
counts[1] as count_a,
variants[2] as variant_b,
avg_values[2] as avg_b,
var_values[2] as var_b,
counts[2] as count_b;
最佳实践
表设计
- 选择合适的引擎:MergeTree系列适合大多数场景
- 合理分区:按时间分区,避免过多分区
- 优化排序键:查询条件放在前面
- 使用合适的数据类型:避免浪费空间
- 使用物化视图:预聚合常用查询
查询优化
- 避免全表扫描:使用分区和索引
- 使用物化视图:预聚合结果
- 限制结果数量:使用LIMIT
- 使用PREWHERE:先过滤再处理
- 避免JOIN:使用字典替代
集群管理
- 合理分片:根据数据量和查询模式
- 使用副本:提高可用性
- 监控集群:及时发现异常
- 定期维护:优化表和分区
- 备份重要数据:防止数据丢失
安全配置
- 启用认证:配置用户和权限
- 限制访问:使用防火墙和网络隔离
- 加密传输:使用SSL/TLS
- 审计日志:记录操作日志
- 定期更新:及时升级版本
总结
ClickHouse是一个功能强大、性能卓越的列式数据库。通过本文的介绍,我们了解了:
- ClickHouse概述:核心特性和应用场景
- 核心概念:数据库引擎、表引擎、数据类型、索引
- 数据操作:创建表、插入数据、查询数据、更新删除
- 高级功能:物化视图、字典、外部表、函数
- 集群部署:集群架构、配置文件、副本表、分布式表
- 性能优化:表设计、查询优化、配置优化、系统优化
- 监控运维:系统表、性能监控、维护操作
- 实战案例:用户行为分析、实时监控、A/B测试分析
- 最佳实践:表设计、查询优化、集群管理、安全配置
掌握ClickHouse可以帮助我们构建高性能的实时数据分析系统,解决海量数据的快速查询和分析问题。在实际应用中,建议根据具体场景选择合适的表引擎和优化策略,以达到最佳的性能和用户体验。