Коннектор Spark
Этот коннектор использует оптимизации, специфичные для ClickHouse, такие как продвинутое разбиение на партиции и проталкивание предикатов (predicate pushdown), чтобы улучшить производительность запросов и обработку данных. Коннектор основан на официальном JDBC-коннекторе ClickHouse и управляет собственным каталогом.
До версии Spark 3.0 в Spark не было встроенной концепции каталога, поэтому пользователи обычно полагались на внешние системы каталогов, такие как Hive Metastore или AWS Glue. При использовании этих внешних решений пользователям приходилось вручную регистрировать таблицы источников данных, прежде чем получать к ним доступ в Spark. Однако с тех пор, как в Spark 3.0 была введена концепция каталога, Spark теперь может автоматически обнаруживать таблицы посредством регистрации плагинов каталогов.
Каталог по умолчанию в Spark — spark_catalog, а таблицы идентифицируются как {catalog name}.{database}.{table}. С новой
возможностью работы с каталогами теперь можно добавлять несколько каталогов и работать с ними в одном приложении Spark.
Требования
- Java 8 или 17 (для Spark 4.0 требуется Java 17+)
- Scala 2.12 или 2.13 (Spark 4.0 поддерживает только Scala 2.13)
- Apache Spark 3.3, 3.4, 3.5 или 4.0
Матрица совместимости
| Версия | Совместимые версии Spark | Версия JDBC-драйвера ClickHouse |
|---|---|---|
| main | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.9.0 | Spark 3.3, 3.4, 3.5, 4.0 | 0.9.4 |
| 0.8.1 | Spark 3.3, 3.4, 3.5 | 0.6.3 |
| 0.7.3 | Spark 3.3, 3.4 | 0.4.6 |
| 0.6.0 | Spark 3.3 | 0.3.2-patch11 |
| 0.5.0 | Spark 3.2, 3.3 | 0.3.2-patch11 |
| 0.4.0 | Spark 3.2, 3.3 | Нет зависимости |
| 0.3.0 | Spark 3.2, 3.3 | Нет зависимости |
| 0.2.1 | Spark 3.2 | Нет зависимости |
| 0.1.2 | Spark 3.2 | Нет зависимости |
Установка и настройка
Для интеграции ClickHouse со Spark доступно несколько вариантов установки, подходящих для разных конфигураций проектов.
Вы можете добавить коннектор ClickHouse для Spark как зависимость непосредственно в файл сборки вашего проекта (например, в pom.xml
для Maven или build.sbt для SBT).
Либо вы можете поместить необходимые JAR-файлы в каталог $SPARK_HOME/jars/ или передать их напрямую как параметр Spark
с помощью флага --jars в команде spark-submit.
Оба подхода обеспечивают доступность коннектора ClickHouse в вашей среде Spark.
Импорт как зависимость
- Maven
- Gradle
- SBT
- Spark SQL/Shell CLI
Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT.
Добавьте следующий репозиторий, если вы хотите использовать версию SNAPSHOT:
При использовании опций оболочки Spark (Spark SQL CLI, Spark Shell CLI и команды Spark Submit) зависимости можно зарегистрировать, передав необходимые JAR-файлы:
Если вы хотите избежать копирования JAR-файлов на клиентский узел Spark, вместо этого можно использовать следующее:
Примечание: для сценариев, где используется только SQL, в продакшене рекомендуется Apache Kyuubi.
Скачайте библиотеку
Шаблон имени бинарного JAR-файла:
Вы можете найти все доступные релизные JAR‑файлы в Maven Central Repository и все ежедневные SNAPSHOT‑сборки JAR‑файлов в Sonatype OSS Snapshots Repository.
Крайне важно включить clickhouse-jdbc JAR с классификатором «all», так как коннектор зависит от clickhouse-http и clickhouse-client, — оба они входят в clickhouse-jdbc:all. В качестве альтернативы вы можете добавить clickhouse-client JAR и clickhouse-http по отдельности, если предпочитаете не использовать полный JDBC‑пакет.
В любом случае убедитесь, что версии пакетов совместимы в соответствии с матрицей совместимости.
Зарегистрируйте каталог (обязательно)
Чтобы получить доступ к своим таблицам ClickHouse, необходимо настроить новый каталог Spark со следующими параметрами:
| Свойство | Значение | Значение по умолчанию | Обязательно |
|---|---|---|---|
spark.sql.catalog.<catalog_name> | com.clickhouse.spark.ClickHouseCatalog | N/A | Yes |
spark.sql.catalog.<catalog_name>.host | <clickhouse_host> | localhost | No |
spark.sql.catalog.<catalog_name>.protocol | http | http | No |
spark.sql.catalog.<catalog_name>.http_port | <clickhouse_port> | 8123 | No |
spark.sql.catalog.<catalog_name>.user | <clickhouse_username> | default | No |
spark.sql.catalog.<catalog_name>.password | <clickhouse_password> | (пустая строка) | No |
spark.sql.catalog.<catalog_name>.database | <database> | default | No |
spark.<catalog_name>.write.format | json | arrow | No |
Эти параметры можно задать одним из следующих способов:
- Отредактировать или создать
spark-defaults.conf. - Передать конфигурацию в команду
spark-submit(или в команды CLIspark-shell/spark-sql). - Добавить конфигурацию при инициализации контекста.
При работе с кластером ClickHouse необходимо задать уникальное имя каталога для каждого экземпляра. Например:
Таким образом, вы сможете обращаться к таблице <ck_db>.<ck_table> в clickhouse1 из Spark SQL как clickhouse1.<ck_db>.<ck_table>, а к таблице <ck_db>.<ck_table> в clickhouse2 — как clickhouse2.<ck_db>.<ck_table>.
Настройки ClickHouse Cloud
При подключении к ClickHouse Cloud обязательно включите SSL и задайте необходимый режим SSL. Например:
Чтение данных
- Java
- Scala
- Python
- Spark SQL
Запись данных
- Java
- Scala
- Python
- Spark SQL
Операции DDL
Вы можете выполнять операции DDL в экземпляре ClickHouse с помощью Spark SQL, при этом все изменения немедленно сохраняются в ClickHouse. Spark SQL позволяет писать запросы так же, как и в ClickHouse, поэтому вы можете напрямую выполнять команды, такие как CREATE TABLE, TRUNCATE и другие, без каких-либо изменений, например:
При использовании Spark SQL за один раз может быть выполнен только один оператор.
Приведённые выше примеры демонстрируют запросы Spark SQL, которые вы можете выполнять в своём приложении с использованием любого из API — Java, Scala, PySpark или оболочки.
Конфигурации
Ниже приведены настраиваемые параметры, доступные в коннекторе:
| Ключ | По умолчанию | Описание | Начиная с |
|---|---|---|---|
| spark.clickhouse.ignoreUnsupportedTransform | false | ClickHouse поддерживает использование сложных выражений в качестве ключей шардирования или значений партиционирования, например cityHash64(col_1, col_2), которые в настоящее время не поддерживаются Spark. Если true, неподдерживаемые выражения игнорируются, в противном случае выполнение немедленно завершается с исключением. Обратите внимание, что при включённой настройке spark.clickhouse.write.distributed.convertLocal игнорирование неподдерживаемых ключей шардирования может привести к повреждению данных. | 0.4.0 |
| spark.clickhouse.read.compression.codec | lz4 | Кодек, используемый для распаковки данных при чтении. Поддерживаемые кодеки: none, lz4. | 0.5.0 |
| spark.clickhouse.read.distributed.convertLocal | true | При чтении таблицы Distributed использовать локальную таблицу вместо самой распределённой. Если true, игнорировать spark.clickhouse.read.distributed.useClusterNodes. | 0.1.0 |
| spark.clickhouse.read.fixedStringAs | двоичный | Считывать тип ClickHouse FixedString как заданный тип данных Spark. Поддерживаемые типы: binary, string | 0.8.0 |
| spark.clickhouse.read.format | json | Формат сериализации для чтения. Поддерживаемые форматы: JSON, Binary | 0.6.0 |
| spark.clickhouse.read.runtimeFilter.enabled | false | Включить динамический фильтр при чтении. | 0.8.0 |
| spark.clickhouse.read.splitByPartitionId | true | Если установлено значение true, формировать входной фильтр партиций по виртуальному столбцу _partition_id, а не по значению партиции. Известны проблемы при построении SQL-предикатов по значению партиции. Для использования этой возможности требуется ClickHouse Server v21.6+. | 0.4.0 |
| spark.clickhouse.useNullableQuerySchema | false | Если true, помечать все поля схемы запроса как допускающие значение NULL при выполнении CREATE/REPLACE TABLE ... AS SELECT ... при создании таблицы. Обратите внимание, что эта настройка требует SPARK-43390 (доступно в Spark 3.5); без этого патча данная опция фактически всегда равна true. | 0.8.0 |
| spark.clickhouse.write.batchSize | 10000 | Количество записей в одном пакете при записи в ClickHouse. | 0.1.0 |
| spark.clickhouse.write.compression.codec | lz4 | Кодек, используемый для сжатия данных при их записи. Поддерживаемые кодеки: none, lz4. | 0.3.0 |
| spark.clickhouse.write.distributed.convertLocal | false | При записи в таблицу Distributed данные записываются в локальную таблицу, а не в саму распределённую таблицу. Если установлено значение true, параметр spark.clickhouse.write.distributed.useClusterNodes игнорируется. | 0.1.0 |
| spark.clickhouse.write.distributed.useClusterNodes | true | При записи в распределённую таблицу записывать данные на все узлы кластера. | 0.1.0 |
| spark.clickhouse.write.format | стрелка | Формат сериализации при записи. Поддерживаемые форматы: JSON, Arrow | 0.4.0 |
| spark.clickhouse.write.localSortByKey | true | Если установлено значение true, выполнять локальную сортировку по ключам сортировки перед записью. | 0.3.0 |
| spark.clickhouse.write.localSortByPartition | значение параметра spark.clickhouse.write.repartitionByPartition | Если имеет значение true, выполняется локальная сортировка по разделу перед записью. Если не задано, используется значение spark.clickhouse.write.repartitionByPartition. | 0.3.0 |
| spark.clickhouse.write.maxRetry | 3 | Максимальное количество повторных попыток записи для одной пакетной операции, завершившейся сбоем с кодами ошибок, допускающими повторную попытку. | 0.1.0 |
| spark.clickhouse.write.repartitionByPartition | true | Определяет, нужно ли переразбивать данные по ключам партиционирования ClickHouse, чтобы они соответствовали распределению данных в таблице ClickHouse перед записью. | 0.3.0 |
| spark.clickhouse.write.repartitionNum | 0 | Перед записью требуется перераспределить данные в соответствии с распределением таблицы ClickHouse. Используйте этот параметр конфигурации для указания количества переразбиений; значение меньше 1 означает, что перераспределение не требуется. | 0.1.0 |
| spark.clickhouse.write.repartitionStrictly | false | Если true, Spark будет строго распределять входящие записи по разделам, чтобы обеспечить требуемое распределение перед записью данных в таблицу источника. В противном случае Spark может применять определённые оптимизации для ускорения запроса, но при этом нарушить требуемое распределение. Обратите внимание, что для этой конфигурации необходим патч SPARK-37523 (доступен в Spark 3.4); без этого патча она всегда ведёт себя как true. | 0.3.0 |
| spark.clickhouse.write.retryInterval | 10s | Интервал в секундах между повторными попытками записи. | 0.1.0 |
| spark.clickhouse.write.retryableErrorCodes | 241 | Коды ошибок, допускающих повторную попытку, возвращаемые сервером ClickHouse при сбое записи. | 0.1.0 |
Поддерживаемые типы данных
В этом разделе описывается соответствие типов данных между Spark и ClickHouse. Таблицы ниже служат быстрым справочником по преобразованию типов данных при чтении из ClickHouse в Spark и при вставке данных из Spark в ClickHouse.
Чтение данных из ClickHouse в Spark
| Тип данных ClickHouse | Тип данных Spark | Поддерживается | Примитивный тип | Примечания |
|---|---|---|---|---|
Nothing | NullType | ✅ | Да | |
Bool | BooleanType | ✅ | Да | |
UInt8, Int16 | ShortType | ✅ | Да | |
Int8 | ByteType | ✅ | Да | |
UInt16,Int32 | IntegerType | ✅ | Да | |
UInt32,Int64, UInt64 | LongType | ✅ | Да | |
Int128,UInt128, Int256, UInt256 | DecimalType(38, 0) | ✅ | Да | |
Float32 | FloatType | ✅ | Да | |
Float64 | DoubleType | ✅ | Да | |
String, JSON, UUID, Enum8, Enum16, IPv4, IPv6 | StringType | ✅ | Да | |
FixedString | BinaryType, StringType | ✅ | Да | Определяется настройкой READ_FIXED_STRING_AS |
Decimal | DecimalType | ✅ | Да | Точность и масштаб до Decimal128 |
Decimal32 | DecimalType(9, scale) | ✅ | Да | |
Decimal64 | DecimalType(18, scale) | ✅ | Да | |
Decimal128 | DecimalType(38, scale) | ✅ | Да | |
Date, Date32 | DateType | ✅ | Да | |
DateTime, DateTime32, DateTime64 | TimestampType | ✅ | Да | |
Array | ArrayType | ✅ | Нет | Тип элементов массива также преобразуется |
Map | MapType | ✅ | Нет | Ключи ограничены типом StringType |
IntervalYear | YearMonthIntervalType(Year) | ✅ | Да | |
IntervalMonth | YearMonthIntervalType(Month) | ✅ | Да | |
IntervalDay, IntervalHour, IntervalMinute, IntervalSecond | DayTimeIntervalType | ✅ | Нет | Используется соответствующий тип интервала |
Object | ❌ | |||
Nested | ❌ | |||
Tuple | StructType | ✅ | Нет | Поддерживает как именованные, так и неименованные кортежи. Именованные кортежи сопоставляются с полями структуры по имени, неименованные используют _1, _2 и т. д. Поддерживаются вложенные структуры и Nullable-поля |
Point | ❌ | |||
Polygon | ❌ | |||
MultiPolygon | ❌ | |||
Ring | ❌ | |||
IntervalQuarter | ❌ | |||
IntervalWeek | ❌ | |||
Decimal256 | ❌ | |||
AggregateFunction | ❌ | |||
SimpleAggregateFunction | ❌ |
Вставка данных из Spark в ClickHouse
| Тип данных Spark | Тип данных ClickHouse | Поддерживается | Примитивный | Примечания |
|---|---|---|---|---|
BooleanType | Bool | ✅ | Да | Отображается в тип Bool (а не UInt8) начиная с версии 0.9.0 |
ByteType | Int8 | ✅ | Да | |
ShortType | Int16 | ✅ | Да | |
IntegerType | Int32 | ✅ | Да | |
LongType | Int64 | ✅ | Да | |
FloatType | Float32 | ✅ | Да | |
DoubleType | Float64 | ✅ | Да | |
StringType | String | ✅ | Да | |
VarcharType | String | ✅ | Да | |
CharType | String | ✅ | Да | |
DecimalType | Decimal(p, s) | ✅ | Да | Точность и масштаб до Decimal128 |
DateType | Date | ✅ | Да | |
TimestampType | DateTime | ✅ | Да | |
ArrayType (list, tuple, or array) | Array | ✅ | Нет | Тип элементов массива также преобразуется |
MapType | Map | ✅ | Нет | Ключи ограничены типом StringType |
StructType | Tuple | ✅ | Нет | Преобразуется в именованный Tuple с именами полей. |
VariantType | VariantType | ❌ | Нет | |
Object | ❌ | |||
Nested | ❌ |
Участие и поддержка
Если вы хотите внести вклад в проект или сообщить о каких-либо проблемах, мы будем рады вашей обратной связи! Посетите наш репозиторий на GitHub, чтобы создать issue, предложить улучшения или отправить pull request. Мы рады любому вкладу! Прежде чем начать, ознакомьтесь с руководством по внесению изменений в репозитории. Спасибо, что помогаете делать наш коннектор ClickHouse для Spark лучше!