Skip to main content

CSV Format

Format: Serialization SchemaFormat: Deserialization Schema

CSV Format 允许我们基于 CSV schema 进行解析和生成 CSV 数据。 目前 CSV schema 是基于 table schema 推断而来的。

依赖

如何创建使用 CSV 格式的表

以下是一个使用 Kafka 连接器和 CSV 格式创建表的示例。

CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'csv',
'csv.ignore-parse-errors' = 'true',
'csv.allow-comments' = 'true'
)

Format 参数

参数是否必选默认值类型描述
format
必选(none)String指定要使用的格式,这里应该是 'csv'
csv.field-delimiter
可选,String字段分隔符 (默认','),必须为单字符。你可以使用反斜杠字符指定一些特殊字符,例如 '\t' 代表制表符。 你也可以通过 unicode 编码在纯 SQL 文本中指定一些特殊字符,例如 'csv.field-delimiter' = U&'\0001' 代表 0x01 字符。
csv.disable-quote-character
可选falseBoolean是否禁止对引用的值使用引号 (默认是 false). 如果禁止,选项 'csv.quote-character' 不能设置。
csv.quote-character
可选"String用于围住字段值的引号字符 (默认").
csv.allow-comments
可选falseBoolean是否允许忽略注释行(默认不允许),注释行以 '#' 作为起始字符。 如果允许注释行,请确保 csv.ignore-parse-errors 也开启了从而允许空行。
csv.ignore-parse-errors
可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null
csv.array-element-delimiter
可选;String分隔数组和行元素的字符串(默认';').
csv.escape-character
可选(none)String转义字符(默认关闭).
csv.null-literal
可选(none)String是否将 "null" 字符串转化为 null 值。

数据类型映射

目前 CSV 的 schema 都是从 table schema 推断而来的。显式地定义 CSV schema 暂不支持。 Flink 的 CSV Format 数据使用 jackson databind API 去解析 CSV 字符串。

下面的表格列出了flink数据和CSV数据的对应关系。

Flink SQL 类型CSV 类型
CHAR / VARCHAR / STRINGstring
BOOLEANboolean
BINARY / VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
INTERVALnumber
ARRAYarray
ROWobject