아파치 Avro 데이터 소스 가이드
- 배포하기
- 불러오기와 저장하기 함수
- to_avro() 와 from_avro()
- 데이터 소스 옵션
- 설정
- Databricks spark-avro와의 호환성
- 지원되는 Avro -> Spark SQL 변환 타입
- 지원되는 Spark SQL -> Avro 변환 타입
스파크 2.4 이후, 스파크 SQL은 아파치 Avro 데이터 읽기와 쓰기 기능을 내장하고 있습니다.
배포하기
spark-avro
는 외부 모듈이기 때문에 spark-submit
or spark-shell
에 기본적으로 포함되어 있지 않습니다.
다른 스파크 응용 프로그램과 마찬가지로 spark-submit
은 응용 프로그램을 실행하기 위해 사용됩니다. avro 기능을 사용하기 위해 필요한 spark-avro_2.12
모듈과 이 모듈이 필요로 하는 다른 의존 라이브러리는 다음과 같이 --packages
를 이용하여 spark-submit
에 추가될 수 있습니다.
./bin/spark-submit --packages org.apache.spark:spark-avro_2.12:2.4.3 ...
spark-shell
에서 해당 기능을 사용하기 위해서는, --packages
를 이용하여 org.apache.spark:spark-avro_2.11
와 이에 따르는 의존 라이브러리를 추가할 수 있습니다.
./bin/spark-shell --packages org.apache.spark:spark-avro_2.12:2.4.3 ...
외부 의존성이 있는 응용 프로그램 실행에 대한 자세한 내용은 응용 프로그램 실행 안내를 참조하세요.
불러오기와 저장하기 함수
spark-avro
는 외부 모듈이기 때문에 DataFrameReader
나 DataFrameWriter
에 .avro
API 가 없습니다.
Avro 형식으로 데이터를 불러오거나 저장하기 위해서는 데이터 소스 옵션 format
을 avro
(또는 org.apache.spark.sql.avro
)로 지정해야 합니다.
to_avro() 와 from_avro()
Avro 패키지는 컬럼을 Avro 형식의 바이너리로 인코드하는 to_avro
함수와 Avro 바이너리 데이터를 컬럼으로 디코드하는 from_avro()
함수를 제공합니다. 두 함수는 하나의 컬럼을 다른 컬럼으로 변환하는 역할을 합니다. 해당 입력/출력 SQL 데이터 타입은 복합(complex) 타입과 기본 타입 모두가 될 수 있습니다.
Avro 레코드를 컬럼으로 사용하는 것은 Kafka와 같은 스트리밍 소스를 읽거나 쓸 때 유용합니다. 각 Kafka key-value 레코드는 Kafka의 timestamp나 Kafka의 offset과 같은 메타데이터와 합쳐지게 됩니다.
- 데이터를 포함하는 “value” 필드가 Avro로 되어있다면,
from_avro()
를 사용해서 데이터를 추출하고, 확장하고, 정제한 다음 다시 Kafka로 downstream에 넣거나 파일로 쓸 수 있습니다. to_avro()
은 구조체(struct)를 Avro 레코드로 바꾸는데 사용할 수 있습니다. 이 방법은 데이터를 Kafka로 쓸 때 여러 컬럼을 하나의 컬럼으로 다시 인코딩하고 싶은 경우 특히 유용합니다.
두 함수는 현재 Scala와 Java에서만 쓸 수 있습니다.
데이터 소스 옵션
Avro의 데이터 소스 옵션은 DataFrameReader
이나 DataFrameWriter
에 있는 .option
메소드를 이용하여 설정할 수 있습니다.
속성 이름 | 기본값 | 의미 | 적용 범위 |
---|---|---|---|
avroSchema |
(없음) | 사용자가 JSON 형식으로 지정하는 Avro 스키마입니다(선택). 레코드 필드의 날짜 타입과 이름은 입력된 Avro 데이터나 Catalyst 데이터와 일치 해야 합니다. 그렇지 않을 경우 읽기/쓰기 작업이 실패합니다 | 읽기, 쓰기 |
recordName |
topLevelRecord | Avro 스펙에서 요구하는 쓰기 결과의 최상위 레코드 이름. | 쓰기 |
recordNamespace |
"" | 쓰기 결과의 레코드 네임 스페이스(namespace). | 쓰기 |
ignoreExtension |
true | 이 옵션은 읽기에서 .avro 확장자가 없는 파일을 무시할지를 결정합니다.이 옵션이 활성화되면 모든 파일( .avro 확장자가 있는 파일과 없는 파일 모두)을 불러옵니다. |
읽기 |
compression |
snappy | compression 옵션은 쓰기에 사용되는 압축 코덱을 지정하도록 합니다.현재 지원되는 코덱은 uncompressed , snappy , deflate , bzip2 와 xz 입니다.옵션이 설정되어 있지 않다면, 설정값 spark.sql.avro.compression.codec 이 적용됩니다. |
쓰기 |
설정
Avro의 설정은 SparkSession의 setConf
메소드를 이용하거나 SQL로 SET key=value
명령을 실행해 구성할 수 있습니다.
속성 이름 | 기본값 | 의미 |
---|---|---|
spark.sql.legacy.replaceDatabricksSparkAvro.enabled | true | true로 설정되면, 데이터 소스 제공자 com.databricks.spark.avro 는 하위 호환성을 위해 내장된 외부 모듈인 Avro 데이터 소스 모듈에 매핑됩니다. |
spark.sql.avro.compression.codec | snappy | AVRO 파일을 쓰는데 사용되는 압축 코덱. 지원 코덱: uncompressed, deflate, snappy, bzip2 and xz. 기본 코덱은 snappy입니다. |
spark.sql.avro.deflate.level | -1 | AVRO 파일을 쓰는데 사용되는 deflate 코덱의 압축 레벨입니다. 유효한 값은 1 이상 9 이하 또는 -1 이여야 합니다. 기본값은 -1로 현재 구현에서는 6 레벨에 해당합니다. (역자 주: gzip 등에 사용되는 deflate 압축 코덱은 1 부터 9까지의 압축 레벨을 가지며, -1로 지정될 경우에는 현재 사용중인 구현체의 기본값을 사용한다. 그리고 스파크 2.4 기준으로 그 '기본값'이 6이다.) |
Databricks spark-avro와의 호환성
이 Avro 데이터 소스 모듈은 원래 Databricks가 제작하여 독립된 오픈 소스 저장소를 통해 공개된 모듈인 https://github.com/databricks/spark-avro에서 비롯되었으며, 자연히 이 모듈과 호환됩니다. 여기서는 2.4 이전부터 spark-avro를 사용해 온 경우 호환성을 유지하기 위한 방법에 대해 설명합니다.
SQL 설정 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
은 켜져 있도록 기본값이 잡혀 있습니다. 그리고 이 데이터 소스 제공자 com.databricks.spark.avro
는 내장된 Avro 모듈에 매핑되도록 구현되어 있습니다. 카탈로그 메타 스토어에 Provider 속성이 com.databricks.spark.avro
로 잡혀 있는 스파크 테이블의 경우, 내장 Avro 모듈을 사용하고 있다면 이 테이블을 불러오기 위한 매핑이 필수입니다.
Databricks의 spark-avro에서 implicit class인 AvroDataFrameWriter
와 AvroDataFrameReader
는 .avro()
라는 단축 함수를 위해 만들어졌습니다. 이 내장된 외부 모듈에서, 두 implicit class는 삭제됩니다. 대신 DataFrameWriter
이나 DataFrameReader
의 .format("avro")
를 사용할 수 있습니다.
여러분이 직접 빌드한 spark-avro
jar 파일을 사용하고 싶다면, 간단하게 spark.sql.legacy.replaceDatabricksSparkAvro.enabled
설정을 비활성화 하고 애플리케이션을 배포할 때 --jars
를 이용하면 됩니다. 자세한 내용은 애플리케이션 제출 가이드의 고급 의존성 관리 섹션을 참고하세요.
지원되는 Avro -> Spark SQL 변환 타입
현재 스파크는 Avro 레코드의 모든 기본 타입 및 복합 타입을 읽을 수 있습니다.
Avro 타입 | 스파크 SQL 타입 |
---|---|
boolean | BooleanType |
int | IntegerType |
long | LongType |
float | FloatType |
double | DoubleType |
string | StringType |
enum | StringType |
fixed | BinaryType |
bytes | BinaryType |
record | StructType |
array | ArrayType |
map | MapType |
union | See below |
위에 나열된 타입 이외에도 union
타입을 읽는 것을 지원합니다. 다음의 세 타입은 기본 union
타입으로 간주됩니다:
union(int, long)
은 LongType에 매핑됩니다.union(float, double)
은 DoubleType에 매핑됩니다.union(something, null)
(여기서 something은 모든 지원되는 Avro 타입입니다)은 같은 nullable 설정이 true로 설정된 something의 스파크 SQL 타입에 매핑됩니다. 이 외의 모든 union 타입은 복합 타입으로 간주됩니다. 이는 StructType에 매핑되며 필드 이름은 union 멤버의 이름에 따라 member0, member1, … 와 같이 정해집니다. 이는 Avro와 Parquet 사이의 변환 동작과 같습니다.
다음과 같이 Avro 논리 타입을 읽는 것도 지원합니다.
Avro 논리 타입 | Avro 타입 | 스파크 SQL 타입 |
---|---|---|
date | int | DateType |
timestamp-millis | long | TimestampType |
timestamp-micros | long | TimestampType |
decimal | fixed | DecimalType |
decimal | bytes | DecimalType |
현재 버전에서 Avro 파일에 정의되어 있는 docs, aliases 그리고 기타 다른 속성들은 모두 무시됩니다.
지원되는 Spark SQL -> Avro 변환 타입
스파크는 모든 스파크 SQL 타입을 Avro로 쓰는 것을 지원합니다. 대부분의 경우 각각의 스파크 타입에서 Avro 타입으로의 변환은 직관적입니다만 (예. IntegerType을 int로 변환), 아래 목록과 같이 몇 가지 특별한 경우가 존재합니다.
Spark SQL type | Avro 타입 | Avro 논리 타입 |
---|---|---|
ByteType | int | |
ShortType | int | |
BinaryType | bytes | |
DateType | int | date |
TimestampType | long | timestamp-micros |
DecimalType | fixed | decimal |
스파크 SQL 타입이 Avro 타입으로 변환될 수 있도록 avroSchema
옵션을 사용하여 전체 출력 Avro 스키마를 지정할 수도 있습니다. 아래 타입들은 기본값이 적용되지 않기 때문에, 사용자가 반드시 지정해 줘야 합니다.
Spark SQL type | Avro type | Avro logical type |
---|---|---|
BinaryType | fixed | |
StringType | enum | |
TimestampType | long | timestamp-millis |
DecimalType | bytes | decimal |