Tanzu Greenplum Streaming Server 1.11

Loading JSON Data from Kafka Using gpsscli

Last Updated February 26, 2025

This example uses the Tanzu Greenplum Tanzu client utility, gpsscli, rather than the gpkafka utility, to load JSON-format data from Kafka into VMware Tanzu Greenplum.

In this example, you load JSON format data from a Kafka topic named topic_json_gpkafka into a Tanzu Greenplum table named json_from_kafka. You perform the load as the Greenplum role gpadmin. The table json_from_kafka resides in the public schema in a Tanzu Greenplum database named testdb.

A producer of the Kafka topic_json_gpkafka topic emits customer expense messages in JSON format that include the customer identifier (integer), the month (integer), and an expense amount (decimal). For example, a message for a customer with identifier 123 who spent $456.78 in the month of September follows:

{ "cust_id": 123, "month": 9, "amount_paid":456.78 }

You will run a Kafka console producer to emit JSON-format customer expense messages, start a Tanzu Greenplum Tanzu instance, and use the GPSS gpsscli subcommands to load the data into the json_from_kafka table.

Prerequisites

Before you start this procedure, ensure that you:

  • Have administrative access to running Kafka and Tanzu Greenplum clusters.
  • Have configured connectivity as described in both the Tanzu Greenplum Tanzu Prerequisites section and the Kafka Prerequisites.
  • Identify and note the ZooKeeper hostname and port.
  • Identify and note the hostname and port of the Kafka broker(s).
  • Identify and note the hostname and port of the Tanzu Greenplum coordinator node.
  • Register the GPSS extension.

This procedure assumes that you have installed the Apache Kafka distribution. If you are using a different Kafka distribution, you may need to adjust certain commands in the procedure.

Procedure

  1. Login to a host in your Kafka cluster. For example:

    $ ssh kafkauser@kafkahost
    kafkahost$ 
    
  2. Create a Kafka topic named topic_json_gpkafka. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-topics.sh --create \
        --zookeeper localhost:2181 --replication-factor 1 --partitions 1 \
        --topic topic_json_gpkafka
    
  3. Open a file named sample_data.json in the editor of your choice. For example:

    kafkahost$ vi sample_data.json
    
  4. Copy/paste the following text to add JSON-format data into the file, and then save and exit:

    { "cust_id": 1313131, "month": 12, "expenses": 1313.13 }
    { "cust_id": 3535353, "month": 11, "expenses": 761.35 }
    { "cust_id": 7979797, "month": 10, "expenses": 4489.00 }
    { "cust_id": 7979797, "month": 11, "expenses": 18.72 }
    { "cust_id": 3535353, "month": 10, "expenses": 6001.94 }
    { "cust_id": 7979797, "month": 12, "expenses": 173.18 }
    { "cust_id": 1313131, "month": 10, "expenses": 492.83 }
    { "cust_id": 3535353, "month": 12, "expenses": 81.12 }
    { "cust_id": 1313131, "month": 11, "expenses": 368.27 }
    
  5. Stream the contents of the sample_data.json file to a Kafka console producer. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-producer.sh \
        --broker-list localhost:9092 \
        --topic topic_json_gpkafka < sample_data.json
    
  6. Verify that the Kafka console producer published the messages to the topic by running a Kafka console consumer. For example:

    kafkahost$ $KAFKA_INSTALL_DIR/bin/kafka-console-consumer.sh \
        --bootstrap-server localhost:9092 --topic topic_json_gpkafka \
        --from-beginning
    
  7. Open a new terminal window, log in to the Tanzu Greenplum coordinator host as the gpadmin administrative user, and set up the Greenplum environment. For example:

    $ ssh gpadmin@gpcoord
    gpcoord$ . /usr/local/greenplum-db/greenplum_path.sh
    
  8. Construct the Tanzu Greenplum Tanzu configuration file. For example, open a file named gpsscfg_ex.json in the editor of your choice:

    gpcoord$ vi gpsscfg_ex.json
    
  9. Designate a GPSS listen port number of 5019 and a gpfdist port number of 8319 in the configuration file. For example, copy/paste the following into the gpsscfg_ex.json file, and then save and exit the editor:

    {
        "ListenAddress": {
            "Host": "",
            "Port": 5019
        },
        "Gpfdist": {
            "Host": "",
            "Port": 8319
        }
    }
    
  10. Start the Tanzu Greenplum Tanzu instance in the background, specifying the log directory ./gpsslogs. For example:

    gpcoord$ gpss --config gpsscfg_ex.json --log-dir ./gpsslogs & 
    
  11. Construct the load configuration file. Open a file named jsonload_cfg.yaml in the editor of your choice. For example:

    gpcoord$ vi jsonload_cfg.yaml
    
  12. Fill in the load configuration parameter values based on your environment. This example assumes:

    • Your Tanzu Greenplum coordinator hostname is gpcoord.
    • The Tanzu Greenplum server is running on the default port.
    • Your Kafka broker host and port is localhost:9092.
    • You want to write the Kafka data to a Tanzu Greenplum table named json_from_kafka located in the public schema of a database named testdb.
    • You want to write the customer identifier and expenses data to Greenplum. The jsonload_cfg.yaml file would include the following contents:
    DATABASE: testdb
    USER: gpadmin
    HOST: gpcoord
    PORT: 5432
    KAFKA:
       INPUT:
         SOURCE:
            BROKERS: localhost:9092
            TOPIC: topic_json_gpkafka
         COLUMNS:
            - NAME: jdata
              TYPE: json
         FORMAT: json
         ERROR_LIMIT: 10
       OUTPUT:
         TABLE: json_from_kafka
         MAPPING:
            - NAME: customer_id
              EXPRESSION: (jdata->>'cust_id')::int
            - NAME: month
              EXPRESSION: (jdata->>'month')::int
            - NAME: amount_paid
              EXPRESSION: (jdata->>'expenses')::decimal
       COMMIT:
         MINIMAL_INTERVAL: 2000
    
  13. Create the target Tanzu Greenplum table named json_from_kafka. For example:

    gpcoord$ psql -d testdb
    
    testdb=# CREATE TABLE json_from_kafka( customer_id int8, month int4, amount_paid decimal(9,2) );
    
  14. Exit the psql subsystem:

    testdb=# \q
    
  15. Submit the Kafka data load job to the GPSS instance running on port number 5019. (You may consider opening a new terminal window to run the command.) For example to submit a job named kafkajson2gp:

    gpcoord$ gpsscli submit --name kafkajson2gp --gpss-port 5019 ./jsonload_cfg.yaml
    20200804 12:54:19.25262,116652,info,JobID: d577cf37890b5b6bf4e713a9586e86c9,JobName: kafkajson2gp
    
  16. List all GPSS jobs. For example:

    gpcoord$ gpsscli list --all --gpss-port 5019
    JobName          JobID                               GPHost       GPPort   DataBase    Schema      Table              Topic               Status  
    kafkajson2gp     d577cf37890b5b6bf4e713a9586e86c9    localhost    5432     testdb      public      json_from_kafka    topic_json_gpkafka  JOB_SUBMITTED
    

    The list subcommand displays all jobs. Notice the entry for the kafkajson2gp that you just submitted, and that the job is in the Submitted state.

  17. Start the job named kafkajson2gp. For example:

    gpcoord$ gpsscli start kafkajson2gp --gpss-port 5019
    20200804 12:57:57.35153,117918,info,Job kafkajson2gp is started
    
  18. Stop the job named kafkajson2gp. For example:

    gpcoord$ gpsscli stop kafkajson2gp --gpss-port 5019
    20200804 13:05:09.24280,117506,info,stop job: kafkajson2gp success
    
  19. Examine the gpss command output and log file, looking for messages that identify the number of rows inserted/rejected. For example:

    ... -[INFO]:- ... Inserted 9 rows
    ... -[INFO]:- ... Rejected 0 rows
    
  20. View the contents of the Tanzu Greenplum target table json_from_kafka:

    gpcoord$ psql -d testdb
    
    testdb=# SELECT * FROM json_from_kafka WHERE customer_id='1313131' 
               ORDER BY amount_paid;
     customer_id | month | amount_paid 
    -------------+-------+-------------
         1313131 |    11 |      368.27
         1313131 |    10 |      492.83
         1313131 |    12 |     1313.13
    (3 rows)