实时分析

在本教程中,我们将演示如何使用 Citus 获取事件数据,并对该数据实时运行分析查询。为此,我们将使用一个示例 Github 事件数据集。

本教程假设您已经安装并运行了 Citus。

数据模型和样本数据

我们将演示为实时分析应用程序构建数据库。该应用程序将插入大量事件数据,并以亚秒级延迟对这些数据进行分析查询。在我们的示例中,我们将使用 Github 事件数据集。该数据集包括 Github 上的所有公共事件,例如 commits、forks、new issues 以及对这些问题的 comments。

我们将使用两个 Postgres 表来表示这些数据。要开始使用,您需要下载这些表的示例数据:

curl https://examples.citusdata.com/tutorial/users.csv > users.csv
curl https://examples.citusdata.com/tutorial/events.csv > events.csv

如果您使用 Docker,则应使用 docker cp 命令将文件复制到 Docker 容器中。

docker cp users.csv citus:.
docker cp events.csv citus:.

创建表

首先,您可以先使用 psql 连接到 Citus coordinator。

如果您使用原生 Postgres,如我们的单节点 Citus 指南中安装的那样,coordinator 节点将在端口 9700 上运行。

psql -p 9700

如果您使用的是 Docker,则可以通过使用 docker exec 命令运行 psql 进行连接:

docker exec -it citus psql -U postgres

然后,您可以使用标准 PostgreSQL CREATE TABLE 命令创建表。

CREATE TABLE github_events
(
    event_id bigint,
    event_type text,
    event_public boolean,
    repo_id bigint,
    payload jsonb,
    repo jsonb,
    user_id bigint,
    org jsonb,
    created_at timestamp
);

CREATE TABLE github_users
(
    user_id bigint,
    url text,
    login text,
    avatar_url text,
    gravatar_id text,
    display_login text
);

接下来,您可以像在 PostgreSQL 中那样为事件数据创建索引。在本例中,我们还将创建一个 GIN 索引以更快地查询 jsonb 字段。

CREATE INDEX event_type_index ON github_events (event_type);
CREATE INDEX payload_index ON github_events USING GIN (payload jsonb_path_ops);

分发表和加载数据

我们现在将继续告诉 Citus 将这些表分布到集群中的节点上。为此,您可以运行 create_distributed_table 并指定要分片的表和要分片的列。在这种情况下,我们将对 user_id 上的所有表进行分片。

SELECT create_distributed_table('github_users', 'user_id');
SELECT create_distributed_table('github_events', 'user_id');

对用户标识符上的所有表进行分片允许 Citus 将这些表 colocate 在一起,并允许有效的连接和分布式汇总。您可以在此处了解有关此方法的好处的更多信息。

然后,您可以继续使用标准 PostgreSQL \COPY 命令将我们下载的数据加载到表中。如果您将文件下载到其他位置,请确保指定正确的文件路径。

\copy github_users from 'users.csv' with csv
\copy github_events from 'events.csv' with csv

运行查询

现在我们已经将数据加载到表中,让我们继续运行一些查询。首先,让我们检查一下分布式数据库中有多少用户。

SELECT count(*) FROM github_users;

现在,让我们分析一下我们数据中的 Github 推送事件。我们将首先通过使用每个推送事件中不同提交的数量来计算每分钟的提交数量。

SELECT date_trunc('minute', created_at) AS minute,
       sum((payload->>'distinct_size')::int) AS num_commits
FROM github_events
WHERE event_type = 'PushEvent'
GROUP BY minute
ORDER BY minute;

我们还有一个用户表。我们还可以轻松地 join 用户(github_users)与事件(github_events),并找到创建最多存储库的前十名用户。

SELECT login, count(*)
FROM github_events ge
JOIN github_users gu
ON ge.user_id = gu.user_id
WHERE event_type = 'CreateEvent' AND payload @> '{"ref_type": "repository"}'
GROUP BY login
ORDER BY count(*) DESC LIMIT 10;

Citus 还支持用于摄取和修改数据的标准 INSERTUPDATEDELETE 命令。例如,您可以通过运行以下命令来更新用户的 display login

UPDATE github_users SET display_login = 'no1youknow' WHERE user_id = 24305673;

至此,我们的教程结束了。下一步,您可以查看实时应用部分,了解如何为自己的数据建模并为实时分析应用程序提供动力。