gpkafka configuration file (version 2).
Synopsis
DATABASE: <db_name>
USER: <user_name>
PASSWORD: <password>
HOST: <host>
PORT: <greenplum_port>
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: <kafka_broker_host:broker_port> [, ... ]
TOPIC: <kafka_topic>
[PARTITIONS: (<partition_numbers>)]
[FALLBACK_OFFSET: { earliest | latest }]
[VALUE:
COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]
FORMAT: <value_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string>
[EOL_PREFIX: <prefix_string>]
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[AVRO_OPTION:
[SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
[SCHEMA_CA_ON_GPDB: <sr_ca_file_path>]
[SCHEMA_CERT_ON_GPDB: <sr_cert_file_path>]
[SCHEMA_KEY_ON_GPDB: <sr_key_file_path>]
[SCHEMA_MIN_TLS_VERSION: <minimum_version>]
[SCHEMA_PATH_ON_GPDB: <path_to_file>]r
[BYTES_TO_BASE64: <boolean>]] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[FORCE_NOT_NULL: <columns>]
[FILL_MISSING_FIELDS: <boolean>]] |
[JSONL_OPTION:
[NEWLINE: <newline_str>]] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[KEY:
COLUMNS:
- NAME: { <column_name> | __IGNORED__ }
TYPE: <column_data_type>
[ ... ]
FORMAT: <key_data_format>
[[DELIMITED_OPTION:
DELIMITER: <delimiter_string> |
[EOL_PREFIX: <prefix_string>]
[QUOTE: <quote_char>]
[ESCAPE: <escape_char>] ] |
[AVRO_OPTION:
[SCHEMA_REGISTRY_ADDR: <http://schemareg_host:schemareg_port> [, ... ]]
[SCHEMA_CA_ON_GPDB: <sr_ca_file_path>]
[SCHEMA_CERT_ON_GPDB: <sr_cert_file_path>]
[SCHEMA_KEY_ON_GPDB: <sr_key_file_path>]
[SCHEMA_MIN_TLS_VERSION: <minimum_version>]
[SCHEMA_PATH_ON_GPDB: <path_to_file>]
[BYTES_TO_BASE64: <boolean>]] |
[CSV_OPTION:
[DELIMITER: <delim_char>]
[QUOTE: <quote_char>]
[NULL_STRING: <nullstr_val>]
[ESCAPE: <escape_char>]
[FORCE_NOT_NULL: <columns>]
[FILL_MISSING_FIELDS: <boolean>] |
[CUSTOM_OPTION:
NAME: <udf_name>
PARAMSTR: <udf_parameter_string>]]
[META:
COLUMNS:
- NAME: <meta_column_name>
TYPE: { json | jsonb }
FORMAT: json]
[TRANSFORMER:
PATH: <path_to_plugin_transform_library>
ON_INIT: <plugin_transform_init_name>
TRANSFORM: <plugin_transform_name>
PROPERTIES:
<plugin_transform_property_name>: <property_value>
[ ... ] ]
[FILTER: <filter_string>]
[ENCODING: <char_set>]
[ERROR_LIMIT: { <num_errors> | <percentage_errors> }]
{ OUTPUT:
[SCHEMA: <output_schema_name>]
TABLE: <table_name>
[FILTER: <output_filter_string>]
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[TRANSFORMER:
TRANSFORM: <udf_transform_udf_name>
PROPERTIES:
<udf_transform_property_name>: <property_value>
[ ... ]
COLUMNS:
- <udf_transform_column_name>
[ ... ] ]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ] |
OUTPUTS:
- TABLE: <table_name>
[MODE: <mode>]
[MATCH_COLUMNS:
- <match_column_name>
[ ... ]]
[ORDER_COLUMNS:
- <order_column_name>
[ ... ]]
[UPDATE_COLUMNS:
- <update_column_name>
[ ... ]]
[UPDATE_CONDITION: <update_condition>]
[DELETE_CONDITION: <delete_condition>]
[TRANSFORMER:
TRANSFORM: <udf_transform_udf_name>
PROPERTIES:
<udf_transform_property_name>: <property_value>
[ ... ]
COLUMNS:
- <udf_transform_column_name>
[ ... ] ]
[MAPPING:
- NAME: <target_column_name>
EXPRESSION: { <source_column_name> | <expression> }
[ ... ]
|
<target_column_name> : { <source_column_name> | <expression> }
[ ... ] ]
[...] }
[METADATA:
[SCHEMA: <metadata_schema_name>]]
COMMIT:
SAVE_FAILING_BATCH: <boolean>
RECOVER_FAILING_BATCH: <boolean> (Beta)
MAX_ROW: <num_rows>
MINIMAL_INTERVAL: <wait_time>
CONSISTENCY: { strong | at-least | at-most | none }
IDLE_DURATION: <idle_time>
[POLL:
BATCHSIZE: <num_records>
TIMEOUT: <poll_time>]
[TASK:
POST_BATCH_SQL: <udf_or_sql_to_run>
BATCH_INTERVAL: <num_batches>
PREPARE_SQL: <udf_or_sql_to_run>
TEARDOWN_SQL: <udf_or_sql_to_run> ]
[PROPERTIES:
<kafka_property_name>: <kafka_property_value>
[ ... ]]
[SCHEDULE:
RETRY_INTERVAL: <retry_time>
MAX_RETRIES: <num_retries>
RUNNING_DURATION: <run_time>
AUTO_STOP_RESTART_INTERVAL: <restart_time>
MAX_RESTART_TIMES: <num_restarts>
QUIT_AT_EOF_AFTER: <clock_time>]
[ALERT:
COMMAND: <command_to_run>
WORKDIR: <directory>
TIMEOUT: <alert_time>]
Where you may specify any property value with a template variable that GPSS substitutes at runtime using the following syntax:
<PROPERTY:> {{<template_var>}}
Description
You specify load configuration parameters for the gpsscli
and gpkafka
utilities in a YAML-formatted configuration file. (This reference page uses the name gpkafka.yaml
when referring to this file; you may choose your own name for the file.) Load parameters include VMware Tanzu Greenplum connection and target table information, Kafka broker and topic information, and error and commit thresholds.
Version 2 of the
gpkafka.yaml
configuration file syntax supportsKEY
andVALUE
blocks. Version 1 does not.
The gpsscli
and gpkafka
utilities process the YAML configuration file in order, using indentation (spaces) to determine the document hierarchy and the relationships between the sections. The use of white space in the file is significant, and keywords are case-sensitive.
Keywords and Values
Tanzu Greenplum Options
- DATABASE: db_name
- The name of the Tanzu Greenplum.
- USER: user_name
- The name of the Tanzu Greenplum user/role. This user_name must have permissions as described in the VMware Tanzu Greenplum Streaming Server documentation.
- PASSWORD: password
- The password for the Tanzu Greenplum user/role.
- HOST: host
- The host name or IP address of the Tanzu Greenplum coordinator host.
- PORT: greenplum_port
- The port number of the Tanzu Greenplum server on the coordinator host.
- VERSION: 2
- The version of the configuration file. You must specify
VERSION: 2
when you configureVALUE
and/orKEY
blocks in the file.
KAFKA:INPUT: Options
- SOURCE
-
Kafka input configuration parameters.
- BROKERS: kafka_broker_host:broker_port
- The host and port identifying the Kafka broker.
- TOPIC: kafka_topic
- The name of the Kafka topic from which to load data. The topic must exist.
- PARTITIONS: (partition_numbers)
- A single, a comma-separated list, and/or a range of partition numbers from which GPSS reads messages from the Kafka topic. A range that you specify with the
M...N
syntax includes both the range start and end values. By default, GPSS reads messages from all partitions of the Kafka topic.
Ensure that you do not configure multiple jobs that specify overlapping partition numbers in the same topic; GPSS behavior is undefined.
- FALLBACK_OFFSET: { earliest | latest }
- Specifies the behaviour of GPSS when it detects a Kafka message offset gap. When set to
earliest
, GPSS automatically resumes a load operation from the earliest available published message. When set tolatest
, GPSS loads only new messages to the Kafka topic. If this property is not set, GPSS returns an error.
- VALUE:
- The Kafka message value field names, data types, and format. You must specify all Kafka data elements in the order in which they appear in the Kafka message. Optional when you specify a
KEY
block; GPSS ignores the Kafka message value in this circumstance. - KEY:
- The Kafka message key field names, data types, and format. You must specify all Kafka key elements in the order in which they appear in the Kafka message. Optional when you specify a
VALUE
block; GPSS ignores the Kafka message key in this circumstance. - COLUMNS:NAME: column_name
-
The name of a key or value column. column_name must match the column name of the target Tanzu Greenplum table. Specify
__IGNORED__
to omit this Kafka message data element from the load operation. - The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in
COLUMNS:NAME
with a column name in the target Tanzu GreenplumTABLE
. You can override the default mapping by specifying aMAPPING
block. - COLUMNS:TYPE: data_type
- The data type of the column. You must specify an equivalent data type for each non-ignored Kafka message data element and the associated Tanzu Greenplum table column.
- FORMAT: data_format
-
The format of the Kafka message key or value data. You may specify a
FORMAT
ofavro
,binary
,csv
,custom
,delimited
,json
, orjsonl
for the key and value, with some restrictions.- avro
- When you specify the
avro
data format, you must define only a singlejson
type column inCOLUMNS
. If the Kafka message key or value schema is registered in a Confluent Schema Registry, you must also provide theAVRO_OPTION
. - binary
- When you specify the
binary
data format, you must define only a singlebytea
type column inCOLUMNS
. - csv
- When you specify the
csv
data format, the message content cannot contain line ending characters (CR and LF).
You must not provide a
VALUE
block when you specifycsv
format for aKEY
block. Similarly, you must not provide aKEY
block when you specifycsv
format for aVALUE
block.- custom
- When you specify the
custom
data format, you must provide aCUSTOM_OPTION
. - delimited
- When you specify the
delimited
data format, you must provide aDELIMITED_OPTION
. - json
- When you specify the
json
data format, you must define only a singlejson
type column inCOLUMNS
. - jsonl
- When you specify the
jsonl
data format, you may provide aJSONL_OPTION
to define a newline character.
- AVRO_OPTION
-
Optional. When you specify
avro
as theFORMAT
, you may provideAVRO_OPTION
s that identify a schema registry location and optional SSL certificates and keys, and whether or not you want GPSS to convert Avrobytes
fields into base64-encoded strings.- SCHEMA_REGISTRY_ADDR: schemareg_host:schemareg_port
- When you specify
FORMAT: avro
and the Avro schema of the JSON data you want to load is registered in the Confluent Schema Registry, you must identify the host name and port number of each Confluent Schema Registry server in your Kafka cluster. You may specify more than one address, and at least one of the addresses must be legal. - SCHEMA_CA_ON_GPDB: sr_ca_file_path
- The file system path to the CA certificate that GPSS uses to verify the peer. This file must reside in sr_ca_file_path on all Tanzu Greenplum segment hosts.
- SCHEMA_CERT_ON_GPDB: sr_cert_file_path
- The file system path to the client certificate that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_cert_file_path on all Tanzu Greenplum segment hosts.
- SCHEMA_KEY_ON_GPDB: sr_key_file_path
- The file system path to the private key file that GPSS uses to connect to the HTTPS schema registry. This file must reside in sr_key_file_path on all Tanzu Greenplum segment hosts.
- SCHEMA_MIN_TLS_VERSION: minimum_version
- The minimum transport layer security (TLS) version that GPSS requests on the connection to the schema registry. Supported versions are
1.0
,1.1
,1.2
, or1.3
. The default minimum TLS version is1.0
. - SCHEMA_PATH_ON_GPDB: path_to_file
- When you specify the
avro
format and the Avro schema of the JSON key or value data that you want to load is specified in a separate.avsc
file, you must identify the file system location in path_to_file, and the file must reside in this location on every Tanzu Greenplum segment host.
GPSS does not cache the schema. GPSS must reload the schema for every batch of Kafka data. Also, GPSS supports providing the schema for either the key or the value, but not both.
- BYTES_TO_BASE64: boolean
- When
true
, GPSS converts Avrobytes
fields into base64-encoded strings. The default value isfalse
, GPSS does not perform the conversion.
- CSV_OPTION
-
When you specify
FORMAT: csv
, you may provide the following options:- DELIMITER: delim_char
- Specifies a single ASCII character that separates columns within each message or row of data. The default delimiter is a comma (
,
). - QUOTE: quote_char
- Specifies the quotation character. Because GPSS does not provide a default value for this property, you must specify a value.
- NULL_STRING: nullstr_val
- Specifies the string that represents the null value. Because GPSS does not specify a default value for this property, you must specify a value.
- ESCAPE: escape_char
- Specifies the single character that is used for escaping data characters in the content that might otherwise be interpreted as row or column delimiters. Make sure to choose an escape character that is not used anywhere in your actual column data. Because GPSS does not provide a default value for this property. you must specify a value.
- FORCE_NOT_NULL: columns
- Specifies a comma-separated list of column names to process as though each column were quoted and hence not a NULL value. For the default
null_string
(nothing between two delimiters), missing values are evaluated as zero-length strings. - FILL_MISSING_FIELDS: boolean
- Specifies the action of GPSS when it reads a row of data that has missing trailing field values (the row has missing data fields at the end of a line or row). The default value is
false
, GPSS returns an error when it encounters a row with missing trailing field values. - If set to
true
, GPSS sets missing trailing field values toNULL
. Blank rows, fields with aNOT NULL
constraint, and trailing delimiters on a line will still generate an error.
- JSONL_OPTION
-
Optional. When you specify
FORMAT: jsonl
, you may choose to provide theJSONL_OPTION
properties.- NEWLINE: newline_str
- A string that specifies the new line character(s) that end each JSON record. The default newline is
"\n"
.
- CUSTOM_OPTION
-
Optional. When you specify
FORMAT: custom
, you are required to provide theCUSTOM_OPTION
properties. This block identifies the name and the arguments of a custom formatter user-defined function.- NAME: udf_name
- The name of the custom formatter user-defined function.
- PARAMSTR: udf_parameter_string
- A string specifying the comma-separated list of arguments to pass to the custom formatter user-defined function.
- DELIMITED_OPTION
-
Optional. When you specify
FORMAT: delimited
, you may choose to provide theDELIMITER_OPTION
properties.- DELIMITER: delimiter_string
- When you specify the
delimited
format, delimiter_string is required and must identify the data element delimiter. delimiter_string may be a multi-byte value, and up to 32 bytes in length. It may not contain quote and escape characters. - EOL_PREFIX: prefix_string
- Specifies the prefix before the end of line character (
\n
) that indicates the end of a row. The default prefix is empty. - QUOTE: quote_char
- Specifies the single ASCII quotation character. The default quote character is empty.
- If you do not specify a quotation character, GPSS assumes that all columns are unquoted. If you do not specify a quotation character and do specify an escape character, GPSS assumes that all columns are unquoted and escapes the delimiter, end-of-line prefix, and escape itself.
- When you specify a quotation character, you must specify an escape character. GPSS reads any content between quote characters as-is, except for escaped characters.
- ESCAPE: escape_char
- Specifies the single ASCII character used to escape special characters (for example, the delimiter, end-of-line prefix, quote, or escape itself). Therdefault escape character is empty.
- When you specify an escape character and do not specify a quotation character, GPSS escapes only the delimiter, end-of-line prefix, and escape itself.
- When you specify both an escape character and a quotation character, GPSS escapes only these characters.
- META:
-
The field name, type, and format of the Kafka meta data.
META
must specify a singlejson
orjsonb
(Greenplum 6 only) type column andFORMAT: json
. The available Kafka meta data properties include:topic
(text) - the Kafka topic namepartition
(int) - the partition numberoffset
(bigint) - the record location within the partitiontimestamp
(bigint) - the time that the message was appended to the Kafka log
You can load any of these properties into the target table with a
MAPPING
, or use a property in the update or merge criteria for a load operation. - TRANSFORMER:
-
Input data transform block. An input data transformer is a plugin, a set of
go
functions that transform the data after it is read from the source. The semantics of the transform are function-specific. You specify the library and function names in this block, as well as the properties that GPSS passes to these functions:- PATH: path_to_plugin_transform_library
- The file system location of the plugin transformer library on the Tanzu Greenplum streaming server server host.
- ON_INIT: plugin_transform_init_name
- The name of an initialization function that GPSS calls when it loads the transform library.
- TRANSFORM: plugin_transform_name
- The name of the transform function. GPSS invokes this function for every message it reads.
- PROPERTIES: plugin_transform_property_name: property_value
- One or more property name and value pairs that GPSS passes to plugin_transform_init_name and plugin_transform_name.
- FILTER: filter_string
- The filter to apply to the Kafka input messages before GPSS loads the data into Tanzu Greenplum. If the filter evaluates to
true
, GPSS loads the message. If the filter evaluates tofalse
, the message is dropped. filter_string must be a valid SQL conditional expression and may reference one or moreKEY
,VALUE
, orMETA
column names. - ENCODING: char_set
- The source data encoding. You can specify an encoding character set when the source data is of the
csv
,custom
,delimited
, orjson
format. GPSS supports the character sets identified in Character Set Support in the Tanzu Greenplum documentation. - ERROR_LIMIT: { num_errors | percentage_errors }
- The error threshold, specified as either an absolute number or a percentage.
gpkafka load
exits when this limit is reached. The defaultERROR_LIMIT
is zero; GPSS deactivates error logging and stops the load operation when it encounters the first error. Due to a limitation of the Tanzu Greenplum external table framework, GPSS does not acceptERROR_LIMIT: 1
.
KAFKA:OUTPUT: Options
You must specify only one of the
OUTPUT
orOUTPUTS
blocks. You cannot specify both.
- SCHEMA: output_schema_name
- The name of the Tanzu Greenplum schema in which table_name resides. Optional, the default schema is the
public
schema. - TABLE: table_name
- The name of the Tanzu Greenplum table into which GPSS loads the Kafka data.
- FILTER: output_filter_string
- The filter to apply to the output data before GPSS loads the data into Tanzu Greenplum. If the filter evaluates to
true
, GPSS loads the message. If the filter evaluates tofalse
, the message is dropped. output_filter_string must be a valid SQL conditional expression and may reference one or moreMETA
orVALUE
column names. - MODE: mode
-
The table load mode. Valid mode values are
INSERT
,MERGE
, orUPDATE
. The default value isINSERT
. -
UPDATE
- Updates the target table columns that are listed inUPDATE_COLUMNS
when the input columns identified inMATCH_COLUMNS
match the named target table columns and the optionalUPDATE_CONDITION
is true. -
UPDATE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column. -
MERGE
- Inserts new rows and updates existing rows when:- columns are listed in
UPDATE_COLUMNS
, - the
MATCH_COLUMNS
target table column values are equal to the input data, and - an optional
UPDATE_CONDITION
is specified and met.
Deletes rows when:
- the
MATCH_COLUMNS
target table column values are equal to the input data, and - an optional
DELETE_CONDITION
is specified and met.
New rows are identified when the
MATCH_COLUMNS
value in the source data does not have a corresponding value in the existing data of the target table. In those cases, the entire row from the source file is inserted, not only theMATCH_COLUMNS
andUPDATE_COLUMNS
. If there are multiple newMATCH_COLUMNS
values in the input data that are the same, GPSS inserts or updates the target table using a random matching input row. When you specifyORDER_COLUMNS
, GPSS sorts the input data on the specified column(s) and inserts or updates from the input row with the largest value. - columns are listed in
MERGE
is not supported if the target table column name is a reserved keyword, has capital letters, or includes any character that requires quotes (" ") to identify the column.- MATCH_COLUMNS:
-
Required if
MODE
isMERGE
orUPDATE
.- match_column_name
- Specifies the column(s) to use as the join condition for the update. The attribute value in the specified target column(s) must be equal to that of the corresponding source data column(s) in order for the row to be updated in the target table.
- ORDER_COLUMNS:
-
Optional. May be specified in
MERGE
MODE
to sort the input data rows.- order_column_name
- Specify the column(s) by which GPSS sorts the rows. When multiple matching rows exist in a batch,
ORDER_COLUMNS
is used withMATCH_COLUMNS
to determine the input row with the largest value; GPSS uses that row to write/update the target.
- UPDATE_COLUMNS:
-
Required if
MODE
isMERGE
orUPDATE
.- update_column_name
- Specifies the column(s) to update for the rows that meet the
MATCH_COLUMNS
criteria and the optionalUPDATE_CONDITION
.
- UPDATE_CONDITION: update_condition
- Optional. Specifies a boolean condition, similar to that which you would declare in a
WHERE
clause, that must be met in order for a row in the target table to be updated (or inserted, in the case of aMERGE
). - DELETE_CONDITION: delete_condition
- Optional. In
MERGE
MODE
, specifies a boolean condition, similar to that which you would declare in aWHERE
clause, that must be met for GPSS to delete rows in the target table that meet theMATCH_COLUMNS
criteria. - TRANSFORMER:
-
Optional. Output data transform block. An output data transformer is a user-defined function (UDF) that transforms the data before it is loaded into Tanzu Greenplum. The semantics of the UDF are transform-specific.
GPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both.- TRANSFORM: udf_transform_udf_name
- The name of the output transform UDF. GPSS invokes this function for every batch of data it writes to Tanzu Greenplum.
- PROPERTIES: udf_transform_property_name: property_value
- One or more property name and value pairs that GPSS passes to udf_transform_udf_name.
- COLUMNS: udf_transform_column_name
- The name of one or more columns involved in the transform.
- MAPPING:
-
Optional. Overrides the default source-to-target column mapping. GPSS supports two mapping syntaxes.
GPSS currently supports specifying only one of the
MAPPING
or (UDF)TRANSFORMER
blocks in the load configuration file, not both. -
When you specify a
MAPPING
, ensure that you provide a mapping for all Kafka message key and value elements of interest. GPSS does not automatically match column names when you provide aMAPPING
.- NAME: target_column_name
- Specifies the target Tanzu Greenplum table column name.
- EXPRESSION: { source_column_name | expression }
- Specifies a Kafka
COLUMNS:NAME
(source_column_name) or an expression. When you specify an expression, you may provide a value expression that you would specify in theSELECT
list of a query, such as a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so on. - target_column_name: { source_column_name | expression }
- When you use this
MAPPING
syntax, specify the target_column_name and {source_column_name | expression} as described above.
KAFKA:OUTPUTS: Options
You must specify only one of the
OUTPUT
orOUTPUTS
blocks. You cannot specify both.
- TABLE: table_name
- The name of a Tanzu Greenplum table into which GPSS loads the Kafka data.
- other options
- As specified in the KAFKA:OUTPUT: Options section.
KAFKA:METADATA: Options
- SCHEMA: metadata_schema_name
- The name of the Tanzu Greenplum schema in which GPSS creates external and history tables. The default metadata_schema_name is
KAFKA:OUTPUT:SCHEMA
.
Tanzu Greenplum COMMIT: Options
- COMMIT:
-
Controls how
gpkafka load
commits a batch of data to Tanzu Greenplum. You may specify bothMAX_ROW
andMINIMAL_INTERVAL
as long as both values are not zero (0
). Try setting and tuningMINIMAL_INTERVAL
to your environment; introduce aMAX_ROW
setting only if you encounter high memory usage associated with message buffering.- SAVE_FAILING_BATCH: boolean
- Determines whether or not GPSS saves data into a backup table before it writes the data to Tanzu Greenplum. Saving the data in this manner aids recovery when GPSS encounters errors during the evaluation of expressions. The default is
false
; GPSS does not use a backup table, and returns immediately when it encounters an expression error. When you set this property totrue
, GPSS writes both the good and the bad data in the batch to a backup table namedgpssbackup_<jobhash>
, and continues to process incoming Kafka messages. You must then manually load the good data from the backup table into Greenplum or setRECOVER_FAILING_BATCH
(Beta) totrue
to have GPSS automatically reload the good data. -
Using a backup table to hedge against mapping errors may impact performance, especially when the data that you are loading has not been cleaned.
- RECOVER_FAILING_BATCH: boolean (Beta)
- When set to
true
andSAVE_FAILING_BATCH
is alsotrue
, GPSS automatically reloads the good data in the batch and retains only the error data in the backup table. The default value isfalse
; GPSS does not process the backup table. -
Enabling this property requires that GPSS has the Tanzu Greenplum privileges to create a function.
- MAX_ROW: number_of_rows
- The number of rows to batch before triggering an
INSERT
operation on the Tanzu Greenplum table. The default value ofMAX_ROW
is0
, which instructs GPSS to ignore this commit trigger condition. - MINIMAL_INTERVAL: wait_time
- The minimum amount of time to wait (milliseconds) between each
INSERT
operation on the table. The default value is5000
. - CONSISTENCY: { strong | at-least | at-most | none }
- Specify how GPSS should manage message offsets when it acts as a high-level consumer. Valid values are
strong
,at-least
,at-most
, andnone
. The default value isstrong
. Refer to Understanding Kafka Message Offset Management for more detailed information. - IDLE_DURATION: idle_time
-
The maximum amount of time to wait (milliseconds) for the first batch of Kafka data. When you use this property to enable lazy load, GPSS waits until Kafka data is available before locking the target Greenplum table. You can specify:
0
(lazy load is deactivates)-1
(lazy load is activated, the job never stops), or- a positive value (lazy load is activated, the job stops after idle_time duration of no data in the Kafka topic)
The default value is
0
.
Kafka POLL: Options
The
POLL
properties are deprecated and ignored by GPSS.
- POLL:
-
Controls the polling time period and batch size when reading Kafka data.
- BATCHSIZE: num_records
- The number of Kafka records in a batch.
BATCHSIZE
should be smaller thanCOMMIT:MAX_ROW
. The default batch size is 200. - TIMEOUT: poll_time
- The maximum time, in milliseconds, to wait in a polling cycle if Kafka data is not available. You must specify a
TIMEOUT
greater than100
milliseconds and less thanCOMMIT:MINIMAL_INTERVAL
. The default poll timeout is 1000 milliseconds.
Tanzu Greenplum TASK: Options
- TASK:
-
Controls the execution and scheduling of a periodic (maintenance) task.
- POST_BATCH_SQL: udf_or_sql_to_run
- The user-defined function or SQL command(s) that you want to run after the specified number of batches are read from Kafka. The default is null.
- BATCH_INTERVAL: num_batches
- The number of batches to read before running udf_or_sql_to_run. The default batch interval is 0.
- PREPARE_SQL: udf_or_sql_to_run
- The user-defined function or SQL command(s) that you want GPSS to run before it executes the job. The default is null, no command to run.
- TEARDOWN_SQL: udf_or_sql_to_run
- The user-defined function or SQL command(s) that you want GPSS to run after the job stops. GPSS runs the function or command(s) on job success and job failure. The default is null, no command to run.
Kafka PROPERTIES: Options
- PROPERTIES:
-
Kafka consumer configuration property names and values.
- kafka_property_name
- The name of a Kafka property.
- kafka_property_value
- The Kafka property value.
Job SCHEDULE: Options
- SCHEDULE:
-
Controls the frequency and interval of restarting jobs.
- RETRY_INTERVAL: retry_time
- The period of time that GPSS waits before retrying a failed job. You can specify the time interval in day (
d
), hour (h
), minute (m
), second (s
), or millisecond (ms
) integer units; do not mix units. The default retry interval is5m
(5 minutes). - MAX_RETRIES: num_retries
- The maximum number of times GPSS attempts to retry a failed job. The default is 0, do not retry. If you specify a negative value, GPSS retries the job indefinitely.
- RUNNING_DURATION: run_time
- The amount of time after which GPSS automatically stops a job. GPSS does not automatically stop a job by default.
- AUTO_STOP_RESTART_INTERVAL: restart_time
- The amount of time after which GPSS restarts a job that it stopped due to reaching
RUNNING_DURATION
. - MAX_RESTART_TIMES: num_restarts
- The maximum number of times that GPSS restarts a job that it stopped due to reaching
RUNNING_DURATION
. The default is 0, do not restart the job. If you specify the value-1
, GPSS restarts the job indefinitely. You may usegpsscli stop
to stop the jobs from being restarted indefinitely. - QUIT_AT_EOF_AFTER: clock_time
- The clock time after which GPSS stops a job every day when it encounters an EOF. By default, GPSS does not automatically stop a job that reaches EOF. GPSS never stops a job when the current time is before
clock_time
, even when GPSS encounters an EOF.
- Job ALERT: Options
-
Controls notification when a job is stopped for any reason (success, completion, error, user-initiated stop).
- COMMAND: command_to_run
- The program that the GPSS server runs on the GPSS server host, including arguments. The command must be executable by GPSS.
- command_to_run has access to job-related environment variables that GPSS sets, including:
$GPSSJOB_NAME
,$GPSSJOB_STATUS
, and$GPSSJOB_DETAIL
. - WORKDIR: directory
- The working directory for command_to_run. The default working directory is the directory from which you started the GPSS server process. If you specify a relative path, it is relative to the directory from which you started the GPSS server process.
- TIMEOUT: alert_time
- The amount of time after a job stops, prompting GPSS to trigger the alert (and run command_to_run). You can specify the time interval in day (
d
), hour (h
), minute (m
), or second (s
) integer units; do not mix units. The default alert timeout is-1s
(no timeout).
Template Variables
GPSS supports using template variables to specify property values in the load configuration file.
You specify a template variable value in the load configuration file as follows:
<PROPERTY>: {{<template_var>}}
For example:
MAX_RETRIES: {{numretries}}
GPSS substitutes the template variable with a value that you specify via the -p | --property <template_var=value>
option to the gpsscli dryrun
, gpsscli submit
, gpsscli load
, or gpkafka load
command.
For example, if the command line specifies:
--property numretries=10
GPSS substitutes occurrences of {{numretries}}
in the load configuration file with the value 10
before submitting the job, and uses that value while the job is running.
Notes
If you created a database object name using a double-quoted identifier (delimited identifier), you must specify the delimited name within single quotes in the gpkafka.yaml
configuration file. For example, if you create a table as follows:
CREATE TABLE "MyTable" ("MyColumn" text);
Your gpkafka.yaml
YAML configuration file would refer to the above table and column names as:
COLUMNS:
- name: '"MyColumn"'
type: text
OUTPUT:
TABLE: '"MyTable"'
GPSS requires Kafka version 0.11 or newer for exactly-once delivery assurance. You can run with an older version of Kafka (but lose the exactly-once guarantee) by adding the following PROPERTIES
block to your gpkafka-v2.yaml
load configuration file:
PROPERTIES:
api.version.request: false
broker.version.fallback: 0.8.2.1
You can specify backslash escape sequences in the CSV DELIMITER
, QUOTE
, and ESCAPE
options. GPSS supports the standard backslash escape sequences for backspace, form feed, newline, carriage return, and tab, as well as escape sequences that you specify in hexadecimal format (prefaced with \x
). Refer to Backslash Escape Sequences in the PostgreSQL documentation for more information.
Examples
Load data from Kafka as defined in the Version 2 configuration file named kafka2greenplumv2.yaml
:
gpkafka load kafka2greenplumv2.yaml
Example kafka2greenplumv2.yaml
configuration file:
DATABASE: ops
USER: gpadmin
HOST: mdw-1
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: kbrokerhost1:9092
TOPIC: customer_expenses2
PARTITIONS: (2, 5...7, 13)
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
KEY:
COLUMNS:
- NAME: key
TYPE: json
FORMAT: avro
AVRO_OPTION:
SCHEMA_REGISTRY_ADDR: http://localhost:8081
META:
COLUMNS:
- NAME: meta
TYPE: json
FORMAT: json
ERROR_LIMIT: 25
OUTPUT:
SCHEMA: payables
TABLE: expenses2
MAPPING:
- NAME: customer_id
EXPRESSION: (c1->>'cust_id')::int
- NAME: newcust
EXPRESSION: ((c1->>'cust_id')::int > 5000000)::boolean
- NAME: expenses
EXPRESSION: (c1->>'expenses')::decimal
- NAME: tax_due
EXPRESSION: ((c1->>'expenses')::decimal * .075)::decimal
- NAME: t
EXPRESSION: (meta->>'topic')::text
METADATA:
SCHEMA: gpkafka_internal
COMMIT:
MINIMAL_INTERVAL: 2000
See Also
Loading Avro Data from Kafka, gpkafka.yaml, gpkafka load, gpsscli load, gpsscli submit
Content feedback and comments