Load data from Kafka into VMware Tanzu Greenplum.
Synopsis
gpkafka load <jobconfig.yaml>
[--name <job_name>]
[-f | --force] [--quit-at-eof] [--partition]
[{--force-reset-earliest | --force-reset-latest | --force-reset-timestamp <tstamp>}]
[-p | --property <template_var=value>]
[--config <gpfdistconfig.json>]
[--gpfdist-host <hostaddr>] [--gpfdist-port <portnum>]
[--debug-port <portnum> ]
[--color] [--csv-log]
[-l | --log-dir <directory>] [--verbose]
gpkafka load {-h | --help}
Description
gpkafka load
is a wrapper around the VMware Tanzu Greenplum streaming server (GPSS)gpss
andgpsscli
utilities. Starting in Tanzu Greenplum streaming server version 1.3.2,gpkafka load
no longer launches agpss
server instance, but rather calls the backend server code directly.
When you run gpkafka load
, the command submits, starts, and stops a GPSS job on your behalf.
VMware recommends that you migrate to using the GPSS utilities directly.
The gpkafka load
utility loads data from a Kafka topic into a Tanzu Greenplum table. When you run the command, you provide a YAML-formatted configuration file that defines load parameters such as the Tanzu Greenplum connection options, the Kafka broker and topic, and the target Tanzu Greenplum table.
gpkafka load
uses the gpfdist
or gpfdists
protocol to load data into Greenplum. You can configure the protocol options by providing a JSON-formatted GPSS configuration file via the --config gpfdistconfig.json
option to the command, or by specifying the --gpfdist-host hostaddr
and/or --gpfdist-port portnum
options.
By default, gpkafka load
loads all Kafka messages published to the topic, and then waits indefinitely for new messages to load. When you provide the --quit-at-eof
option to the command, the utility exits after it reads all published messages and writes the data to Tanzu Greenplum.
If you provide the --debug-port
option, gpkafka load
displays debug information to stdout
during the load operation and starts a debug server from which you can obtain additional debug information.
In the case of user interrupt or exit, gpkafka load
resumes a load operation specifying the same Kafka topic and Tanzu Greenplum table, target schema, and database names from the last recorded offset. If GPSS detects an offset mismatch, you can choose to resume a load operation from the earliest available offset for the topic. Or, you may choose to load only new messages published to the topic, or messages published since a specific time.
Options
- jobconfig.yaml
- The Version 1 (deprecated), Version 2, or Version 3 YAML-formatted configuration file that defines the load operation parameters. If the filename provided is not an absolute path, Tanzu Greenplum assumes the file system location is relative to the current working directory. Refer to gpkafka.yaml and gpkafka-v2.yaml for the format and content of the parameters that you specify in Versions 1 and 2 of this file. Refer to gpkafka-v3.yaml for Version 3 format information.
- --name job_name
- Use job_name to identify the job. If you do not provide a name, the command assigns a unique identifier to the job.
- -f | --force
-
Force GPSS to reload the configuration of a running job. GPSS stops the job, updates the job with the configuration specified in jobconfig.yaml, and then restarts the job. If you previously named the job, you must provide
--name job\_name
when you force job configuration reload with this option.Do not attempt to update a configuration property that GPSS uses to uniquely identify a Kafka job (the Kafka topic name and the Tanzu Greenplum, schema, and table names). If you change any such configuration property, GPSS creates a new internal job and loads all available messages.
- --quit-at-eof
-
When you specify this option,
gpkafka load
exits after it reads all of the Kafka messages published to the topic. The default behaviour ofgpkafka load
is to wait indefinitely for, and then consume, new Kafka messages published to the topic. gpkafka load
ignores job retrySCHEDULE
configuration settings when it is invoked with the--quit-at-eof
flag.- --partition
- By default,
gpkafka load
outputs the job progress by batch, and displays the start and end times, the message number and size, the number of inserted and rejected rows, and the transfer speed per batch. When you specify the--partition
option,gpkafka load
outputs the job progress by partition, and displays the partition identifier, the start and end times, the beginning and ending offsets, the message size, and the transfer speed per partition. - --force-reset-earliest
-
gpkafka load
returns an error if its recorded offset does not match the Kafka message offset for the topic. Re-rungpkafka load
and specify the--force‑reset‑earliest
option to resume the load operation from the earliest available message published to the Kafka topic.--force-reset-earliest
specified on the command line takes precedence over aFALLBACK_OFFSET/fallback_offset
set in the jobconfig.yaml. - --force-reset-latest
-
gpkafka load
returns an error if its recorded offset does not match the Kafka message offset for the topic. Re-rungpkafka load
and specify the--force‑reset‑latest
option to load only new data messages published to the Kafka topic.--force-reset-latest
specified on the command line takes precedence over aFALLBACK_OFFSET/fallback_offset
set in the jobconfig.yaml. - --force-reset-timestamp tstamp
- Specify the
--force‑reset‑timestamp
option to load Kafka messages published to the topic from the offset associated with the specified time. tstamp must specify epoch time in milliseconds, and is bounded by the earliest message time and the current time. - -p | --property template_var=value
- Substitute value for instances of the property value template {{template_var}} referenced in the jobconfig.yaml load configuration file.
- --config gpfdistconfig.json
-
The GPSS configuration file. This file includes properties that configure the
gpfdist/s
protocol used for the load request. Refer to gpss.json for detailed information about the format of this file and the configuration properties supported.gpkafka load
reads the configuration specified in theGpfdist
protocol block of thegpfdistconfig.json
file; it ignores the GPSS configuration specified in theListenAddress
block of the file. - --gpfdist-host hostaddr
- The
gpfdist
service host name or IP address that GPSS sets in the external tableLOCATION
clause. If specified, overrides aGpfdist:Host
value provided ingpfdistconfig.json
. - --gpfdist-port portnum
- The
gpfdist
service port number. If specified, overrides aGpfdist:Port
value provided ingpfdistconfig.json
. - --debug-port portnum
- When you specify this option,
gpkafka load
starts a debug server at the port identified by portnum; additional debug information including the call stack and performance statistics is available viacurl http://gpkafkahost:portnum/debug/pprof/
. - --color
-
Enable the use of color when displaying front-end log messages. When specified, GPSS colors the log level in messages that it writes to
stdout
. Color is deactivated by default. - GPSS ignores the
--color
option if you also specify--csv-log
. - --csv-log
- Write front-end log messages in CSV format. By default, GPSS writes log messages to
stdout
using spaces between fields for a more human-readable format. - -l | --log-dir directory
-
Specify the directory to which GPSS writes client command log files. GPSS must have write permission to the directory. GPSS creates the log directory if it does not exist.
- If you do not provide this option, GPSS writes client log files to the
$HOME/gpAdminLogs
directory. - --verbose
- The default behaviour of the command utility is to display information and error messages to
stdout
. When you specify the--verbose
option, GPSS also outputs debug-level messages about the operation. - -h | --help
- Show command utility help, and then exit.
Examples
Stream Kafka data into Tanzu Greenplum using the load parameters defined in a configuration file named loadcfg.yaml
located in the current directory:
gpkafka load loadcfg.yaml
Load Kafka data into Tanzu Greenplum using a configuration file located in the current directory named loadcfg.yaml
; exit the load operation after reading all Kafka messages published to the topic:
gpkafka load --quit-at-eof loadcfg.yaml
Content feedback and comments