This example uses the Greenplum Streaming Server client utility,
gpsscli
, rather than thegpkafka
utility, to load JSON-format data from Kafka into Greenplum Database.
In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka
into a Greenplum Database table named json_from_kafka
. You perform the load as the Greenplum role gpadmin
. The table json_from_kafka
resides in the public
schema in a Greenplum database named testdb
.
A producer of the Kafka topic_json_gpkafka
topic emits customer expense messages in JSON format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:
{ "cust_id": 123, "month": 9, "amount_paid":456.78 }
You will run a Kafka console producer to emit JSON-format customer expense messages, start a Greenplum Streaming Server instance, and use the GPSS gpsscli
subcommands to load the data into the json_from_kafka
table.
Prerequisites
Before you start this procedure, ensure that you:
- Have administrative access to running Kafka and Greenplum Database clusters.
- Have configured connectivity as described in both the Greenplum Streaming Server Prerequisites section and the Kafka Prerequisites.
- Identify and note the ZooKeeper hostname and port.
- Identify and note the hostname and port of the Kafka broker(s).
- Identify and note the hostname and port of the Greenplum Database coordinator node.
- Register the GPSS extension.
This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.
Procedure
Login to a host in your Kafka cluster. For example:
$ ssh kafkauser@kafkahost kafkahost$
Create a Kafka topic named
topic_json_gpkafka
. For example:kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \ --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \ --topic topic_json_gpkafka
Open a file named
sample_data.json
in the editor of your choice. For example:kafkahost$ vi sample_data.json
Copy/paste the following text to add JSON-format data into the file, and then save and exit:
{ "cust_id": 1313131, "month": 12, "expenses": 1313.13 } { "cust_id": 3535353, "month": 11, "expenses": 761.35 } { "cust_id": 7979797, "month": 10, "expenses": 4489.00 } { "cust_id": 7979797, "month": 11, "expenses": 18.72 } { "cust_id": 3535353, "month": 10, "expenses": 6001.94 } { "cust_id": 7979797, "month": 12, "expenses": 173.18 } { "cust_id": 1313131, "month": 10, "expenses": 492.83 } { "cust_id": 3535353, "month": 12, "expenses": 81.12 } { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
Stream the contents of the
sample_data.json
file to a Kafka console producer. For example:kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic topic_json_gpkafka < sample_data.json
Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:
kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 --topic topic_json_gpkafka \ --from-beginning
Open a new terminal window, log in to the Greenplum Database coordinator host as the
gpadmin
administrative user, and set up the Greenplum environment. For example:$ ssh gpadmin@gpcoord gpcoord$ . /usr/local/greenplum-db/greenplum_path.sh
Construct the Greenplum Streaming Server configuration file. For example, open a file named
gpsscfg_ex.json
in the editor of your choice:gpcoord$ vi gpsscfg_ex.json
Designate a GPSS listen port number of 5019 and a
gpfdist
port number of 8319 in the configuration file. For example, copy/paste the following into thegpsscfg_ex.json
file, and then save and exit the editor:{ "ListenAddress": { "Host": "", "Port": 5019 }, "Gpfdist": { "Host": "", "Port": 8319 } }
Start the Greenplum Streaming Server instance in the background, specifying the log directory
./gpsslogs
. For example:gpcoord$ gpss --config gpsscfg_ex.json --log-dir ./gpsslogs &
Construct the load configuration file. Open a file named
jsonload_cfg.yaml
in the editor of your choice. For example:gpcoord$ vi jsonload_cfg.yaml
Fill in the load configuration parameter values based on your environment. This example assumes:
- Your Greenplum Database coordinator hostname is
gpcoord
. - The Greenplum Database server is running on the default port.
- Your Kafka broker host and port is
localhost:9092
. - You want to write the Kafka data to a Greenplum Database table named
json_from_kafka
located in thepublic
schema of a database namedtestdb
. - You want to write the customer identifier and expenses data to Greenplum.
The
jsonload_cfg.yaml
file would include the following contents:
DATABASE: testdb USER: gpadmin HOST: gpcoord PORT: 5432 KAFKA: INPUT: SOURCE: BROKERS: localhost:9092 TOPIC: topic_json_gpkafka COLUMNS: - NAME: jdata TYPE: json FORMAT: json ERROR_LIMIT: 10 OUTPUT: TABLE: json_from_kafka MAPPING: - NAME: customer_id EXPRESSION: (jdata->>'cust_id')::int - NAME: month EXPRESSION: (jdata->>'month')::int - NAME: amount_paid EXPRESSION: (jdata->>'expenses')::decimal COMMIT: MINIMAL_INTERVAL: 2000
- Your Greenplum Database coordinator hostname is
Create the target Greenplum Database table named
json_from_kafka
. For example:gpcoord$ psql -d testdb testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
Exit the
psql
subsystem:testdb=# \q
Submit the Kafka data load job to the GPSS instance running on port number 5019. (You may consider opening a new terminal window to run the command.) For example to submit a job named
kafkajson2gp
:gpcoord$ gpsscli submit --name kafkajson2gp --gpss-port 5019 ./jsonload_cfg.yaml 20200804 12:54:19.25262,116652,info,JobID: d577cf37890b5b6bf4e713a9586e86c9,JobName: kafkajson2gp
List all GPSS jobs. For example:
gpcoord$ gpsscli list --all --gpss-port 5019 JobName JobID GPHost GPPort DataBase Schema Table Topic Status kafkajson2gp d577cf37890b5b6bf4e713a9586e86c9 localhost 5432 testdb public json_from_kafka topic_json_gpkafka JOB_SUBMITTED
The
list
subcommand displays all jobs. Notice the entry for thekafkajson2gp
that you just submitted, and that the job is in the Submitted state.Start the job named
kafkajson2gp
. For example:gpcoord$ gpsscli start kafkajson2gp --gpss-port 5019 20200804 12:57:57.35153,117918,info,Job kafkajson2gp is started
Stop the job named
kafkajson2gp
. For example:gpcoord$ gpsscli stop kafkajson2gp --gpss-port 5019 20200804 13:05:09.24280,117506,info,stop job: kafkajson2gp success
Examine the
gpss
command output and log file, looking for messages that identify the number of rows inserted/rejected. For example:... -[INFO]:- ... Inserted 9 rows ... -[INFO]:- ... Rejected 0 rows
View the contents of the Greenplum Database target table
json_from_kafka
:gpcoord$ psql -d testdb testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131' ORDER BY amount_paid; customer_id | month | amount_paid -------------+-------+------------- 1313131 | 11 | 368.27 1313131 | 10 | 492.83 1313131 | 12 | 1313.13 (3 rows)
Content feedback and comments