gpkafka configuration file (version 1).
Synopsis
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
[VERSION: 1]
KAFKA:
INPUT:
SOURCE:
BROKERS: <kafka_broker_host:broker_port> [, ... ]
TOPIC: <kafka_topic>
[COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]]
FORMAT: <data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>] |
[AVRO_OPTION:
[SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
[BYTES_TO_BASE64: <boolean>]] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[FILTER: <filter_string>]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ]
[METADATA:
[SCHEMA: <metadata_schema_name>]]
COMMIT:
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
[POLL:
BATCHSIZE: <num_records>
TIMEOUT: <poll_time>]
[TASK:
POST_BATCH_SQL: <udf_or_sql_to_run>
BATCH_INTERVAL: <num_batches>]
[PROPERTIES:
<kafka_property_name>: <kafka_property_value>
[ ... ]]
Description
The
gpkafka.yaml
Version 1 configuration file format is deprecated and may be removed in a future release. Use the version 2 or version 3 (Beta) configuration file format to configure a Kafka load job.
You specify load configuration parameters for the gpkafka
utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml
when referring to this file; you may choose your own name for the file.) Load parameters include Greenplum Database connection and target table information, Kafka broker and topic information, and error and commit thresholds.
Version 1 of the
gpkafka.yaml
configuration file syntax does not supportKEY
andVALUE
blocks.
The gpkafka
utility processes the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Keywords and Values
Greenplum Database Connection Options
- DATABASE: db_name
- The name of the Greenplum database.
- USER: user_name
- The name of the Greenplum Database user/role. This user_name must have permissions as described in the Greenplum Streaming Server documentation.
- PASSWORD: password
- The password for the Greenplum Database user/role.
- HOST: host
- The host name or IP address of the Greenplum Database coordinator host.
- PORT: greenplum_port
- The port number of the Greenplum Database server on the coordinator host.
- VERSION: 1
- Optional. The version of the load configuration file. The default version is Version 1.
KAFKA:INPUT: Options
- SOURCE
-
Kafka input configuration parameters.
- BROKERS: kafka_broker_host:broker_port
- The host and port identifying the Kafka broker.
- TOPIC: kafka_topic
- The name of the Kafka topic from which to load data. The topic must exist.
- COLUMNS:
-
The column names and data types. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when the column names and types match the target Greenplum Database table definition.
-
The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in
COLUMNS:NAME
with a column name in the target Greenplum DatabaseTABLE
. You can override the default mapping by specifying aMAPPING
block.- NAME: column_name
- The name of a column. column_name must match the column name of the target Greenplum Database table. Specify
__IGNORED__
to omit this Kafka message data element from the load operation. - TYPE: data_type
- The data type of the column. You must specify an equivalent data type for each non-ignored Kafka message data element and the associated Greenplum Database table column.
- FORMAT: data_format
-
The format of the Kafka message value data. You may specify a
FORMAT
ofavro
,binary
,csv
,custom
,delimited
, orjson
.- avro
- When you specify the
avro
data format, you must define only a singlejson
type column inCOLUMNS
. If the Kafka message value schema is registered in a Confluent Schema Registry, you must also provide theAVRO_OPTION
. - binary
- When you specify the
binary
data format, you must define only a singlebytea
type column inCOLUMNS
. - csv
- When you specify the
csv
data format, the message content cannot contain line ending characters (CR and LF). - custom
- When you specify the
custom
data format, you must provide aCUSTOM_OPTION
. - delimited
- When you specify the
delimited
data format, you must provide aDELIMITED_OPTION
. - json
- When you specify the
json
data format, you must define only a singlejson
type column inCOLUMNS
.
- AVRO_OPTION
-
Optional. When you specify
avro
as theFORMAT
, you may provideAVRO_OPTION
s that identify a schema registry location and whether or not you want GPSS to convert Avrobytes
fields into base64-encoded strings.- SCHEMA_REGISTRY_ADDR: http://schemareg_host:schemareg_port
- Optional. When you specify
avro
as theFORMAT
and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal. - BYTES_TO_BASE64: boolean
- When
true
, GPSS converts Avrobytes
fields into base64-encoded strings. The default value isfalse
, GPSS does not perform the conversion.
- CUSTOM_OPTION
-
Optional. When you specify
custom
as theFORMAT
,CUSTOM_OPTION
is required. This block identifies the name and the arguments of a custom formatter user-defined function.- NAME: udf_name
- The name of the custom formatter user-defined function.
- PARAMSTR: udf_parameter_string
- A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
- DELIMITED_OPTION:DELIMITER: delimiter_string
- Optional. When you specify
delimited
as theFORMAT
, delimiter_string is required and must identify the Kafka message data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters. - FILTER: filter_string
- The filter to apply to the Kafka input messages before GPSS loads the data into Greenplum Database. If the filter evaluates to
true
, GPSS loads the message. If the filter evaluates tofalse
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or moreCOLUMNS
names. - ERROR_LIMIT: { num_errors | percentage_errors }
- The error threshold, specified as either an absolute number or a percentage.
gpkafka load
exits when this limit is reached. The defaultERROR_LIMIT
is zero; GPSS deactivates error logging, and stops the load operation when it encounters the first error. Due to a limitation of the Greenplum Database external table framework, GPSS does not acceptERROR_LIMIT: 1
.
KAFKA:OUTPUT: Options
- SCHEMA: output_schema_name
- The name of the Greenplum Database schema in which table_name resides. Optional, the default schema is the
public
schema. - TABLE: table_name
- The name of the Greenplum Database table into which GPSS loads the Kafka data.
- MODE: mode
-
The table load mode. Valid mode values are
INSERT
,MERGE
, orUPDATE
. The default value isINSERT
. -
UPDATE
- Updates the target table columns that are listed inUPDATE_COLUMNS
when the input columns identified inMATCH_COLUMNS
match the named target table columns and the optionalUPDATE_CONDITION
is true. -
UPDATE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column. -
MERGE
- Inserts new rows and updates existing rows when:- columns are listed in
UPDATE_COLUMNS
, - the
MATCH_COLUMNS
target table column values are equal to the input data, and - an optional
UPDATE_CONDITION
is specified and met.
Deletes rows when:
- the
MATCH_COLUMNS
target table column values are equal to the input data, and - an optional
DELETE_CONDITION
is specified and met.
New rows are identified when the
MATCH_COLUMNS
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only theMATCH_COLUMNS
andUPDATE_COLUMNS
. If there are multiple newMATCH_COLUMNS
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specifyORDER_COLUMNS
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value. - columns are listed in
MERGE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.- MATCH_COLUMNS:
-
Required if
MODE
isMERGE
orUPDATE
.- match_column_name
- Specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
- ORDER_COLUMNS:
-
Optional. May be specified in
MERGE
MODE
to sort the input data rows.- order_column_name
- Specify the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch,
ORDER_COLUMNS
is used withMATCH_COLUMNS
to determine the input row with the largest value; GPSS uses that row to write/update the target.
- UPDATE_COLUMNS:
-
Required if
MODE
isMERGE
orUPDATE
.- update_column_name
- Specifies the column(s) to update for the rows that meet the
MATCH_COLUMNS
criteria and the optionalUPDATE_CONDITION
.
- UPDATE_CONDITION: update_condition
- Optional. Specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of aMERGE
). - DELETE_CONDITION: delete_condition
- Optional. In
MERGE
MODE
, specifies a boolean condition, similar to that which you would declare in aWHERE
clause, that must be met for GPSS to delete rows in the target table that meet theMATCH_COLUMNS
criteria. - MAPPING:
-
Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.
-
When you specify a
MAPPING
, ensure that you provide a mapping for all Kafka data elements of interest. GPSS does not automatically match column names when you provide aMAPPING
.- NAME: target_column_name
- Specifies the target Greenplum Database table column name.
- EXPRESSION: { source_column_name | expression }
- Specifies a Kafka
COLUMNS:NAME
(source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in theSELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on. - target_column_name: { source_column_name | expression }
- When you use this
MAPPING
syntax, specify the target_column_name and {source_column_name | expression} as described above.
KAFKA:METADATA: Options
- SCHEMA: metadata_schema_name
- The name of the Greenplum Database schema in which GPSS creates external and history tables. The default metadata_schema_name is
KAFKA:OUTPUT:SCHEMA
.
Greenplum Database COMMIT: Options
- COMMIT:
-
Controls how
gpkafka load
commits data to Greenplum Database. You must specify one ofMAX_ROW
orMINIMAL_INTERVAL
. You may specify both configuration parameters as long as both values are not zero (0
). Try setting and tuningMINIMAL_INTERVAL
to your environment; introduce aMAX_ROW
setting only if you encounter high memory usage associated with message buffering.- MAX_ROW: number_of_rows
- The number of rows to batch before triggering an
INSERT
operation on the Greenplum Database table. The default value ofMAX_ROW
is0
, which instructs GPSS to ignore this commit trigger condition. - MINIMAL_INTERVAL: wait_time
- The minimum amount of time to wait (milliseconds) between each
INSERT
operation on the table. The default value is0
, wait forever.
Kafka POLL: Options
The
POLL
properties are deprecated and ignored by GPSS.
- POLL:
-
Controls the polling time period and batch size when reading Kafka data.
- BATCHSIZE: num_records
- The number of Kafka records in a batch.
BATCHSIZE
must be smaller thanCOMMIT:MAX_ROW
. The default batch size is 200. - TIMEOUT: poll_time
- The maximum time, in milliseconds, to wait in a polling cycle if Kafka data is not available. You must specify a
TIMEOUT
greater than100
milliseconds and less thanCOMMIT:MINIMAL_INTERVAL
. The default poll timeout is 1000 milliseconds.
Greenplum Database TASK: Options
- TASK:
-
Controls the execution and scheduling of a periodic (maintenance) task.
- POST_BATCH_SQL: udf_or_sql_to_run
- The user-defined function or SQL command(s) that you want to run after the specified number of batches are read from Kafka. The default is null.
- BATCH_INTERVAL: num_batches
- The number of batches to read before running udf_or_sql_to_run. The default batch interval is 0.
Kafka PROPERTIES: Options
- PROPERTIES:
-
Kafka consumer configuration property names and values.
- kafka_property_name
- The name of a Kafka property.
- kafka_property_value
- The Kafka property value.
Notes
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml
configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your gpkafka.yaml
YAML configuration file would refer to the above table and column names as:
COLUMNS:
- name: '"MyColumn"'
type: text
OUTPUT:
TABLE: '"MyTable"'
GPSS requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES
block to your gpkafka.yaml
load configuration file:
PROPERTIES:
api.version.request: false
broker.version.fallback: 0.8.2.1
Examples
Load data from Kafka as defined in the Version 1 configuration file named kafka2greenplum.yaml
:
gpkafka load kafka2greenplum.yaml
Example kafka2greenplum.yaml
configuration file:
DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
KAFKA:
INPUT:
SOURCE:
BROKERS: kbrokerhost1:9092
TOPIC: customer_expenses
COLUMNS:
- NAME: cust_id
TYPE: int
- NAME: month
TYPE: int
- NAME: expenses
TYPE: decimal(9,2)
FORMAT: delimited
DELIMITED_OPTION:
DELIMITER: '|'
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: payables
TABLE: expenses
MAPPING:
- NAME: customer_id
EXPRESSION: cust_id
- NAME: newcust
EXPRESSION: cust_id > 5000000
- NAME: expenses
EXPRESSION: expenses
- NAME: tax_due
EXPRESSION: expenses * .0725
METADATA:
SCHEMA: gpkafka_internal
COMMIT:
MINIMAL_INTERVAL: 2000
Content feedback and comments