ClickHouse深度解析

ClickHouse概述

ClickHouse是一个面向列的OLAP(在线分析处理)数据库管理系统,由Yandex开源。它以高性能、低延迟、高压缩比著称,是实时数据分析领域的明星产品。

ClickHouse的核心特性

  • 高性能:单表查询可达数十亿行/秒
  • 列式存储:适合OLAP场景
  • 高压缩比:数据压缩率可达10:1
  • 实时分析:支持实时数据写入和查询
  • SQL支持:兼容标准SQL语法
  • 分布式:支持水平扩展
  • 容错:支持副本和分片

ClickHouse的应用场景

  • 用户行为分析
  • 日志分析
  • 监控数据分析
  • 商业智能(BI)
  • 实时报表
  • 广告投放分析
  • 金融风控

核心概念

数据库引擎

ClickHouse支持多种数据库引擎:

引擎 说明
Ordinary 默认引擎,支持所有表引擎
Lazy 延迟加载,内存不足时卸载数据
MySQL 映射到MySQL数据库
PostgreSQL 映射到PostgreSQL数据库
MaterializedMySQL 实时同步MySQL数据
-- 创建数据库
CREATE DATABASE my_database;

-- 使用Lazy引擎
CREATE DATABASE my_lazy_db ENGINE = Lazy;

-- 映射MySQL数据库
CREATE DATABASE mysql_db ENGINE = MySQL('localhost:3306', 'db', 'user', 'password');

表引擎

表引擎是ClickHouse最重要的概念,决定了数据的存储方式和查询性能。

MergeTree系列

引擎 说明
MergeTree 基础引擎,支持索引和分区
ReplacingMergeTree 去重引擎
SummingMergeTree 聚合引擎
AggregatingMergeTree 高级聚合引擎
CollapsingMergeTree 折叠引擎
VersionedCollapsingMergeTree 版本折叠引擎
GraphiteMergeTree Graphite数据引擎
-- 创建MergeTree表
CREATE TABLE my_table (
    id UInt32,
    date Date,
    name String,
    value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, name)
SETTINGS index_granularity = 8192;

-- 创建ReplacingMergeTree表(去重)
CREATE TABLE dedup_table (
    id UInt32,
    name String,
    version UInt32
) ENGINE = ReplacingMergeTree(version)
ORDER BY id;

-- 创建SummingMergeTree表(聚合)
CREATE TABLE sum_table (
    date Date,
    key String,
    value UInt32
) ENGINE = SummingMergeTree(value)
ORDER BY (date, key);

Log系列

引擎 说明
TinyLog 最简单的引擎,不支持索引
StripeLog 支持索引,性能优于TinyLog
Log 支持索引,读写分离
-- 创建TinyLog表
CREATE TABLE tiny_log_table (
    id UInt32,
    name String
) ENGINE = TinyLog;

Integration系列

引擎 说明
HDFS 读取HDFS文件
S3 读取S3对象存储
MySQL 读取MySQL表
PostgreSQL 读取PostgreSQL表
Kafka 读取Kafka消息
-- 创建HDFS表
CREATE TABLE hdfs_table (
    id UInt32,
    name String
) ENGINE = HDFS('hdfs://localhost:9000/data/*.csv', 'CSV');

-- 创建Kafka表
CREATE TABLE kafka_table (
    id UInt32,
    name String
) ENGINE = Kafka('localhost:9092', 'topic', 'group', 'CSV');

数据类型

基本类型

类型 说明 示例
UInt8 无符号8位整数 0-255
UInt16 无符号16位整数 0-65535
UInt32 无符号32位整数 0-4294967295
UInt64 无符号64位整数 0-18446744073709551615
Int8 有符号8位整数 -128-127
Int16 有符号16位整数 -32768-32767
Int32 有符号32位整数 -2147483648-2147483647
Int64 有符号64位整数 -9223372036854775808-9223372036854775807
Float32 单精度浮点数 3.14
Float64 双精度浮点数 3.1415926535
String 字符串 "hello"
FixedString(N) 固定长度字符串 "hello"
Date 日期 2023-01-01
DateTime 日期时间 2023-01-01 12:00:00

复杂类型

类型 说明
Array 数组
Map 映射
Tuple 元组
Nested 嵌套结构
Enum 枚举
-- 创建包含复杂类型的表
CREATE TABLE complex_table (
    id UInt32,
    tags Array(String),
    metadata Map(String, String),
    point Tuple(Float32, Float32),
    user Nested(
        name String,
        age UInt32
    )
) ENGINE = MergeTree()
ORDER BY id;

-- 插入数据
INSERT INTO complex_table VALUES (
    1,
    ['tag1', 'tag2', 'tag3'],
    map('key1', 'value1', 'key2', 'value2'),
    (1.0, 2.0),
    ('John', 25)
);

-- 查询数据
SELECT
    id,
    tags[1] as first_tag,
    metadata['key1'] as value,
    point.1 as x,
    point.2 as y,
    user.name,
    user.age
FROM complex_table;

索引

ClickHouse使用稀疏索引,索引粒度可配置。

-- 创建表时设置索引粒度
CREATE TABLE indexed_table (
    id UInt32,
    date Date,
    name String,
    value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, name)
SETTINGS index_granularity = 8192;

-- 创建二级索引
CREATE TABLE table_with_secondary_index (
    id UInt32,
    date Date,
    name String,
    value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, name)
SETTINGS
    index_granularity = 8192,
    secondary_index_name = value_index,
    secondary_index_type = minmax,
    secondary_index_granularity = 8192;

数据操作

创建表

-- 创建MergeTree表
CREATE TABLE events (
    event_id UInt64,
    event_time DateTime,
    user_id UInt32,
    event_type String,
    properties String,
    value Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id)
SETTINGS index_granularity = 8192;

-- 创建分布式表
CREATE TABLE distributed_events AS events
ENGINE = Distributed(cluster_name, default, events, rand());

插入数据

-- 单条插入
INSERT INTO events VALUES (
    1,
    '2023-01-01 12:00:00',
    1001,
    'click',
    '{"page": "home"}',
    10.5
);

-- 批量插入
INSERT INTO events VALUES
    (1, '2023-01-01 12:00:00', 1001, 'click', '{"page": "home"}', 10.5),
    (2, '2023-01-01 12:00:01', 1002, 'view', '{"page": "product"}', 5.0),
    (3, '2023-01-01 12:00:02', 1001, 'purchase', '{"product": "item1"}', 100.0);

-- 从文件导入
INSERT INTO events FROM INFILE '/path/to/data.csv' FORMAT CSV;

-- 从查询插入
INSERT INTO events
SELECT
    event_id,
    event_time,
    user_id,
    event_type,
    properties,
    value
FROM source_events;

查询数据

-- 基本查询
SELECT * FROM events LIMIT 10;

-- 条件查询
SELECT * FROM events WHERE event_time >= '2023-01-01' AND event_time < '2023-02-01';

-- 聚合查询
SELECT
    event_type,
    COUNT(*) as count,
    SUM(value) as total_value,
    AVG(value) as avg_value
FROM events
GROUP BY event_type;

-- 时间窗口查询
SELECT
    toStartOfHour(event_time) as hour,
    COUNT(*) as count
FROM events
GROUP BY hour
ORDER BY hour;

-- 窗口函数
SELECT
    event_time,
    user_id,
    value,
    SUM(value) OVER (PARTITION BY user_id ORDER BY event_time) as running_total
FROM events;

更新和删除

-- 删除数据
ALTER TABLE events DELETE WHERE event_time < '2023-01-01';

-- 更新数据
ALTER TABLE events UPDATE value = value * 1.1 WHERE event_type = 'purchase';

-- 分区删除
ALTER TABLE events DROP PARTITION '202301';

-- 分区附加
ALTER TABLE events ATTACH PARTITION '202301';

高级功能

物化视图

物化视图自动维护查询结果,提高查询性能。

-- 创建物化视图
CREATE MATERIALIZED VIEW events_mv
ENGINE = SummingMergeTree()
ORDER BY (event_date, event_type)
AS SELECT
    toDate(event_time) as event_date,
    event_type,
    COUNT(*) as count,
    SUM(value) as total_value
FROM events
GROUP BY event_date, event_type;

-- 查询物化视图
SELECT * FROM events_mv WHERE event_date = '2023-01-01';

-- 删除物化视图
DROP TABLE events_mv;

字典

字典用于快速查找关联数据。

-- 创建字典
CREATE DICTIONARY user_dict (
    user_id UInt32,
    user_name String,
    user_level UInt8
) PRIMARY KEY user_id
SOURCE(CLICKHOUSE(HOST 'localhost' PORT 9000 USER 'default' PASSWORD '' DATABASE 'default' TABLE 'users'))
LIFETIME(300)
LAYOUT(HASHED());

-- 使用字典
SELECT
    e.user_id,
    dictGet('user_dict', 'user_name', e.user_id) as user_name,
    dictGet('user_dict', 'user_level', e.user_id) as user_level
FROM events e;

外部表

外部表用于查询外部数据源。

-- 查询CSV文件
SELECT * FROM file('data.csv', 'CSV', 'id UInt32, name String');

-- 查询JSON文件
SELECT * FROM file('data.json', 'JSONEachRow', 'id UInt32, name String');

-- 查询URL
SELECT * FROM url('http://example.com/data.csv', 'CSV');

函数

聚合函数

-- 基本聚合
SELECT
    COUNT(*) as count,
    SUM(value) as sum,
    AVG(value) as avg,
    MIN(value) as min,
    MAX(value) as max
FROM events;

-- 条件聚合
SELECT
    countIf(event_type = 'click') as click_count,
    sumIf(value, event_type = 'purchase') as purchase_sum
FROM events;

-- 顶级聚合
SELECT
    quantile(0.5)(value) as median,
    quantile(0.9)(value) as p90,
    quantile(0.99)(value) as p99
FROM events;

数组函数

-- 数组操作
SELECT
    arrayJoin([1, 2, 3]) as value,
    arraySum([1, 2, 3]) as sum,
    arrayAvg([1, 2, 3]) as avg,
    arrayDistinct([1, 2, 2, 3]) as distinct,
    arraySort([3, 1, 2]) as sorted;

时间函数

-- 时间操作
SELECT
    now() as current_time,
    toYYYYMM(now()) as year_month,
    toStartOfDay(now()) as start_of_day,
    toStartOfHour(now()) as start_of_hour,
    dateDiff('day', '2023-01-01', now()) as days_diff,
    addDays(now(), 7) as next_week;

字符串函数

-- 字符串操作
SELECT
    splitByChar(' ', 'hello world') as words,
    arrayJoin(splitByChar(' ', 'hello world')) as word,
    length('hello') as len,
    substring('hello', 1, 3) as sub,
    lower('HELLO') as lower_str,
    upper('hello') as upper_str,
    replace('hello world', 'world', 'clickhouse') as replaced;

集群部署

集群架构

┌─────────────────────────────────────┐
│         ClickHouse Cluster          │
│  ┌────────┐  ┌────────┐  ┌────────┐│
│  │ Node 1 │  │ Node 2 │  │ Node 3 ││
│  │Replica1│  │Replica2│  │Replica3││
│  └────────┘  └────────┘  └────────┘│
└─────────────────────────────────────┘

配置文件

<!-- config.xml -->
<clickhouse>
    <listen_host>0.0.0.0</listen_host>
    <http_port>8123</http_port>
    <tcp_port>9000</tcp_port>

    <remote_servers>
        <cluster_name>
            <shard>
                <replica>
                    <host>node1</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>node2</host>
                    <port>9000</port>
                </replica>
            </shard>
            <shard>
                <replica>
                    <host>node3</host>
                    <port>9000</port>
                </replica>
            </shard>
        </cluster_name>
    </remote_servers>

    <zookeeper>
        <node>
            <host>zookeeper1</host>
            <port>2181</port>
        </node>
        <node>
            <host>zookeeper2</host>
            <port>2181</port>
        </node>
        <node>
            <host>zookeeper3</host>
            <port>2181</port>
        </node>
    </zookeeper>
</clickhouse>

副本表

-- 创建副本表
CREATE TABLE events_replica (
    event_id UInt64,
    event_time DateTime,
    user_id UInt32,
    event_type String,
    properties String,
    value Float64
) ENGINE = ReplicatedMergeTree(
    '/clickhouse/tables/{shard}/events',
    '{replica}'
)
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id);

分布式表

-- 创建分布式表
CREATE TABLE events_distributed (
    event_id UInt64,
    event_time DateTime,
    user_id UInt32,
    event_type String,
    properties String,
    value Float64
) ENGINE = Distributed(
    cluster_name,
    default,
    events_replica,
    rand()
);

性能优化

表设计优化

-- 使用合适的数据类型
CREATE TABLE optimized_table (
    id UInt32,              -- 使用UInt32而不是String
    date Date,               -- 使用Date而不是DateTime
    status LowCardinality(String),  -- 使用LowCardinality
    value Float32           -- 使用Float32而不是Float64
) ENGINE = MergeTree()
ORDER BY id;

-- 合理设置分区
CREATE TABLE partitioned_table (
    id UInt32,
    date Date,
    value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)  -- 按月分区
ORDER BY (date, id);

-- 合理设置排序键
CREATE TABLE sorted_table (
    id UInt32,
    date Date,
    user_id UInt32,
    value Float32
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(date)
ORDER BY (date, user_id, id);  -- 查询条件放在前面

查询优化

-- 使用物化视图
CREATE MATERIALIZED VIEW events_mv
ENGINE = SummingMergeTree()
ORDER BY (event_date, event_type)
AS SELECT
    toDate(event_time) as event_date,
    event_type,
    COUNT(*) as count
FROM events
GROUP BY event_date, event_type;

-- 使用预聚合
SELECT * FROM events_mv WHERE event_date = '2023-01-01';

-- 避免全表扫描
SELECT * FROM events WHERE event_date = '2023-01-01' AND user_id = 1001;

-- 使用LIMIT限制结果
SELECT * FROM events LIMIT 1000;

-- 使用PREWHERE
SELECT * FROM events PREWHERE event_date = '2023-01-01' WHERE value > 100;

配置优化

<!-- config.xml -->
<clickhouse>
    <max_memory_usage>10000000000</max_memory_usage>
    <max_concurrent_queries>100</max_concurrent_queries>
    <max_threads>8</max_threads>

    <merge_tree>
        <max_suspicious_broken_parts>5</max_suspicious_broken_parts>
        <parts_to_throw_insert>10000</parts_to_throw_insert>
        <parts_to_delay_insert>15000</parts_to_delay_insert>
    </merge_tree>

    <compression>
        <case>
            <min_part_size>1000000000</min_part_size>
            <min_part_size_ratio>0.01</min_part_size_ratio>
            <method>zstd</method>
        </case>
    </compression>
</clickhouse>

系统优化

# 增加文件描述符限制
ulimit -n 65535

# 调整内核参数
echo "vm.max_map_count=262144" >> /etc/sysctl.conf
echo "vm.swappiness=1" >> /etc/sysctl.conf
sysctl -p

# 使用SSD存储
# 将数据目录挂载到SSD

监控与运维

系统表

-- 查看表信息
SELECT * FROM system.tables WHERE database = 'default';

-- 查看分区信息
SELECT
    table,
    partition,
    rows,
    bytes_on_disk
FROM system.parts
WHERE table = 'events';

-- 查看查询信息
SELECT
    query,
    duration,
    read_rows,
    read_bytes,
    memory_usage
FROM system.query_log
WHERE type = 'QueryFinish'
ORDER BY duration DESC
LIMIT 10;

-- 查看进程信息
SELECT * FROM system.processes;

性能监控

-- 查看集群信息
SELECT * FROM system.clusters;

-- 查看副本信息
SELECT * FROM system.replicas;

-- 查看Zookeeper信息
SELECT * FROM system.zookeeper;

-- 查看磁盘使用情况
SELECT
    name,
    path,
    free_space,
    total_space,
    keep_free_space
FROM system.disks;

维护操作

-- 优化表
OPTIMIZE TABLE events FINAL;

-- 检查表
CHECK TABLE events;

-- 删除旧分区
ALTER TABLE events DROP PARTITION '202301';

-- 修改表设置
ALTER TABLE events MODIFY SETTING max_bytes_to_merge_at_once = 1073741824;

实战案例

用户行为分析

-- 创建用户行为表
CREATE TABLE user_events (
    event_id UInt64,
    event_time DateTime,
    user_id UInt32,
    event_type String,
    page String,
    referrer String,
    device String,
    os String,
    browser String
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(event_time)
ORDER BY (event_time, user_id);

-- 创建物化视图
CREATE MATERIALIZED VIEW user_events_daily_mv
ENGINE = SummingMergeTree()
ORDER BY (event_date, event_type)
AS SELECT
    toDate(event_time) as event_date,
    event_type,
    COUNT(*) as count
FROM user_events
GROUP BY event_date, event_type;

-- 查询每日事件统计
SELECT
    event_date,
    event_type,
    count
FROM user_events_daily_mv
WHERE event_date >= '2023-01-01'
ORDER BY event_date, event_type;

-- 查询用户留存
SELECT
    toDate(event_time) as date,
    uniqExact(user_id) as dau,
    countIf(user_id IN (
        SELECT user_id
        FROM user_events
        WHERE event_time >= date - INTERVAL 1 DAY
        GROUP BY user_id
    )) as retained_users
FROM user_events
GROUP BY date;

实时监控

-- 创建监控表
CREATE TABLE metrics (
    timestamp DateTime,
    metric_name String,
    metric_value Float64,
    tags Map(String, String)
) ENGINE = MergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (timestamp, metric_name);

-- 创建物化视图
CREATE MATERIALIZED VIEW metrics_5min_mv
ENGINE = AggregatingMergeTree()
ORDER BY (window_start, metric_name)
AS SELECT
    toStartOfFiveMinutes(timestamp) as window_start,
    metric_name,
    avgState(metric_value) as avg_value,
    maxState(metric_value) as max_value,
    minState(metric_value) as min_value,
    countState(metric_value) as count
FROM metrics
GROUP BY window_start, metric_name;

-- 查询实时指标
SELECT
    window_start,
    metric_name,
    avgMerge(avg_value) as avg_value,
    maxMerge(max_value) as max_value,
    minMerge(min_value) as min_value,
    countMerge(count_value) as count
FROM metrics_5min_mv
WHERE window_start >= now() - INTERVAL 1 HOUR
ORDER BY window_start, metric_name;

A/B测试分析

-- 创建实验表
CREATE TABLE experiments (
    experiment_id String,
    user_id UInt32,
    variant String,
    timestamp DateTime,
    metric_name String,
    metric_value Float64
) ENGINE = MergeTree()
PARTITION BY toYYYYMM(timestamp)
ORDER BY (experiment_id, timestamp, user_id);

-- 分析实验结果
SELECT
    experiment_id,
    variant,
    COUNT(DISTINCT user_id) as user_count,
    avg(metric_value) as avg_value,
    quantile(0.5)(metric_value) as median,
    quantile(0.9)(metric_value) as p90,
    quantile(0.99)(metric_value) as p99
FROM experiments
WHERE metric_name = 'revenue'
GROUP BY experiment_id, variant;

-- 显著性检验
SELECT
    experiment_id,
    variant_a,
    variant_b,
    (avg_a - avg_b) / sqrt(var_a / count_a + var_b / count_b) as z_score,
    1 - normalCDF(abs(z_score)) as p_value
FROM (
    SELECT
        experiment_id,
        groupArray(variant) as variants,
        groupArray(avg_value) as avg_values,
        groupArray(varPop(metric_value)) as var_values,
        groupArray(count(*)) as counts
    FROM experiments
    WHERE metric_name = 'revenue'
    GROUP BY experiment_id
) ARRAY JOIN
    variants[1] as variant_a,
    avg_values[1] as avg_a,
    var_values[1] as var_a,
    counts[1] as count_a,
    variants[2] as variant_b,
    avg_values[2] as avg_b,
    var_values[2] as var_b,
    counts[2] as count_b;

最佳实践

表设计

  1. 选择合适的引擎:MergeTree系列适合大多数场景
  2. 合理分区:按时间分区,避免过多分区
  3. 优化排序键:查询条件放在前面
  4. 使用合适的数据类型:避免浪费空间
  5. 使用物化视图:预聚合常用查询

查询优化

  1. 避免全表扫描:使用分区和索引
  2. 使用物化视图:预聚合结果
  3. 限制结果数量:使用LIMIT
  4. 使用PREWHERE:先过滤再处理
  5. 避免JOIN:使用字典替代

集群管理

  1. 合理分片:根据数据量和查询模式
  2. 使用副本:提高可用性
  3. 监控集群:及时发现异常
  4. 定期维护:优化表和分区
  5. 备份重要数据:防止数据丢失

安全配置

  1. 启用认证:配置用户和权限
  2. 限制访问:使用防火墙和网络隔离
  3. 加密传输:使用SSL/TLS
  4. 审计日志:记录操作日志
  5. 定期更新:及时升级版本

总结

ClickHouse是一个功能强大、性能卓越的列式数据库。通过本文的介绍,我们了解了:

  1. ClickHouse概述:核心特性和应用场景
  2. 核心概念:数据库引擎、表引擎、数据类型、索引
  3. 数据操作:创建表、插入数据、查询数据、更新删除
  4. 高级功能:物化视图、字典、外部表、函数
  5. 集群部署:集群架构、配置文件、副本表、分布式表
  6. 性能优化:表设计、查询优化、配置优化、系统优化
  7. 监控运维:系统表、性能监控、维护操作
  8. 实战案例:用户行为分析、实时监控、A/B测试分析
  9. 最佳实践:表设计、查询优化、集群管理、安全配置

掌握ClickHouse可以帮助我们构建高性能的实时数据分析系统,解决海量数据的快速查询和分析问题。在实际应用中,建议根据具体场景选择合适的表引擎和优化策略,以达到最佳的性能和用户体验。