The GPSS RabbitMQ data source loads data from a RabbitMQ queue (the traditional AMQP implementation) or a RabbitMQ stream (persistent and replicated structure available in RabbitMQ version 3.9 and later) into Greenplum Database.
You can use the gpsscli utility to load RabbitMQ data/messages into Greenplum Database. The GPSS server, gpss
, is a RabbitMQ consumer. It ingests streaming data from a single RabbitMQ queue or stream using Greenplum Database readable external tables to transform and insert or update the data into a target Greenplum table. You identify the RabbitMQ server, queue or stream name, virtual host, data format, and the Greenplum connection options and target table definition in a YAML-formatted load configuration file that you provide to the utility.
Load Procedure
You will perform the following tasks when you use the Greenplum Streaming Server to load RabbitMQ message data into a Greenplum Database table:
- Ensure that you meet the Prerequisites.
- Register the Greenplum Streaming Server extension.
- Identify the format of the RabbitMQ data.
- Construct the load configuration file.
- Create the target Greenplum Database table.
- Assign Greenplum Database role permissions to the table, if required.
- Run the gpsscli Client Commands to load the data into Greenplum Database.
- Check for load errors.
Prerequisites
Before using the gpsscli
utility to load RabbitMQ data to Greenplum Database, ensure that you:
- Meet the Prerequisites documented for the Greenplum Streaming Server, and configure and start the server.
- Have access to a running RabbitMQ cluster, and that you can identify the hostname and port number of the RabbitMQ server serving the data.
- Can identify the name of the RabbitMQ queue or stream of interest.
- Can run the command on a host that has connectivity to:
- Each RabbitMQ host in the RabbitMQ cluster.
- The Greenplum Database coordinator and all segment hosts.
About Supported Message Data Formats
The Greenplum Streaming Server supports RabbitMQ message value data in the following formats:
Format | Description |
---|---|
binary | Binary format data. GPSS reads binary data from RabbitMQ only as a single bytea-type column. |
csv | Comma-delimited text format data. |
custom | Data of a custom format, parsed by a custom formatter function. |
delimited | Text data separated by a configurable delimiter. |
json, jsonl (version 2 only) | JSON- or JSONB-format data. Specify the
json format when the file is in JSON or JSONB
format. GPSS can read JSON data as a single object or can
read a single JSON record per line.
You must define a mapping if you want GPSS to write the data into
specific columns in the target Greenplum Database table.
Note: GPSS supports JSONB-format data only when loading to
Greenplum 6.
Note: Specify
FORMAT: jsonl in version 2
format load configuration files. Specify json
with is_jsonl: true in version 3 (Beta)
format load configuration files. |
To write RabbitMQ message data into a Greenplum Database table, you must identify the data format in the load configuration file.
Binary
Use the binary
format when your RabbitMQ message data is a stream of bytes. GPSS reads binary data from RabbitMQ and loads it into a single bytea-type column.
CSV
Use the csv
format when your RabbitMQ message data is comma-delimited text and conforms to RFC 4180. The message content may not contain line ending characters (CR and LF).
Data in csv
format may appear in RabbitMQ messages as follows:
"1313131","12","backorder","1313.13"
"3535353","11","shipped","761.35"
"7979797","11","partial","18.72"
Custom
The Greenplum Streaming Server provides a custom data formatter plug-in framework for RabbitMQ messages using user-defined functions. The type of RabbitMQ message data processed by a custom formatter is formatter-specific. For example, a custom formatter may process compressed or complex data.
Delimited Text
The Greenplum Streaming Server supports loading RabbitMQ data delimited by one or more characters that you specify. Use the delimited
format for such data. The delimiter may be a multi-byte value and up to 32 bytes in length.You can also specify quote and escape characters, and an end-of-line prefix.
The delimiter may not contain the quote or escape characters.
When you specify a quote character:
- The left and right quotes are the same.
- Each data element must be quoted. GPSS does not support mixed quoted and unquoted content.
- You must also define an escape character.
- GPSS keeps the original format of any character between the quotes, except the quote and escape characters. This especially applies to the delimiter and
\n
, which do not require additional escape if they are quoted. - The quote character is presented as the escape character plus the quote character (for example, \”).
- The escape character is presented as the escape character plus the escape character (for example, \)
- GPSS parses multiple escape characters from left to right.
When you do not specify a quote character:
The escape character is optional.
If you do not specify an escape character, GPSS treats the delimiter as the column separator, and treats any end-of-line prefix plus
\n
as the row separator.If you do specify an escape character:
- GPSS uses the escape character plus the delimiter as the column separator.
- GPSS uses the escape character plus the end-of-line prefix plus
\n
as the row separator. - The escape character plus the escape character is the escape character itself.
- GPSS parses multiple escape characters from left to right.
Sample data using a pipe ('|') delimiter character follows:
1313131|12|backorder|1313.13
3535353|11|shipped|761.35
7979797|11|partial|18.72
JSON (single object)
Specify the json
format when your RabbitMQ message data is in JSON or JSONB format and you want GPSS to read JSON data from RabbiMQ as a single object into a single column (per the JSON specification, newlines and white space are ignored). You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.
GPSS supports JSONB-format data only when loading to Greenplum 6.
JSON (single record per line)
Specify FORMAT: jsonl
in version 2 format load configuration files or specify json
with is_jsonl: true
in version 3 (Beta) format load configuration files when your RabbitMQ message data is in JSON format, single JSON record per line. You must define a mapping if you want GPSS to write the data into specific columns in the target Greenplum Database table.
Sample JSON message data:
{ "cust_id": 1313131, "month": 12, "amount_paid":1313.13 }
{ "cust_id": 3535353, "month": 11, "amount_paid":761.35 }
{ "cust_id": 7979797, "month": 11, "amount_paid":18.82 }
Registering a Custom Formatter
A custom data formatter for RabbitMQ messages is a user-defined function. If you are using a custom formatter, you must create the formatter function and register it in each database in which you will use the function to write RabbitMQ data to Greenplum tables.
Constructing the rabbitmq.yaml Configuration File
You configure a data load operation from RabbitMQ to Greenplum Database via a YAML-formatted configuration file. This configuration file includes parameters that identify the source RabbitMQ data and information about the Greenplum Database connection and target table, as well as error and commit thresholds for the operation.
When loading from RabbitMQ, the Greenplum Streaming Server supports two versions of the YAML configuration file: version 2 and version 3 (Beta).
Refer to the rabbitmq-v2.yaml reference page for Version 2 configuration file format and the configuration parameters that this version supports. rabbitmq-v3.yaml describes the Version 3 (Beta) format.
Contents of a sample gpsscli
Version 2 YAML configuration file named loadcfgrmq2.yaml
follows:
DATABASE: testdb
USER: gpadmin
PASSWORD: password
HOST: localhost
PORT: 15432
VERSION: 2
RABBITMQ:
INPUT:
SOURCE:
SERVER: gpdmin:changeme@localhost:5552
STREAM: test_stream
VIRTUALHOST: vhost_for_gpss
DATA:
COLUMNS:
- NAME: c1
TYPE: int
- NAME: c2
TYPE: int
- NAME: path
TYPE: text
FORMAT: CSV
OUTPUT:
SCHEMA: "public"
TABLE: tbl_int_text_column
MODE: MERGE
MATCH_COLUMNS:
- c1
UPDATE_COLUMNS:
- c2
ORDER_COLUMNS:
- path
UPDATE_CONDITION: c2 = 11
DELETE_CONDITION: c1 = 0
MAPPING:
- NAME: c1
EXPRESSION: c1::int
- NAME: c2
EXPRESSION: c2::int
- NAME: path
EXPRESSION: path::text
METADATA:
SCHEMA: staging_schema
COMMIT:
MINIMAL_INTERVAL: 200
CONSISTENCY: at-least
PROPERTIES:
eof.when.idle: 1500
Greenplum Database Options (Version 2-Focused)
You identify the Greenplum Database connection options via the DATABASE
, USER
, PASSWORD
, HOST
, and PORT
parameters.
The VERSION
parameter identifies the version of the GPSS YAML configuration file. You must specify version 2
or version v3
.
RABBITMQ:INPUT Options
Specify the RabbitMQ server, virtual host, and queue or stream of interest using the SOURCE
block.
The DATA
block that you provide must specify the COLUMNS
and FORMAT
parameters. The DATA:COLUMNS
block includes the name and type of each data element in the RabbitMQ message. The default source-to-target data mapping behaviour of GPSS is to match a column name as defined in COLUMNS:NAME
with a column name in the target Greenplum Database OUTPUT:TABLE
:
- You must identify the RabbitMQ data elements in the order in which they appear in the RabbitMQ message.
- You may specify
NAME: __IGNORED__
to omit a RabbitMQ message value data element from the load operation. - You must provide the same name for each non-ignored RabbitMQ data element and its associated Greenplum Database table column.
- You must specify an equivalent data type for each non-ignored RabbitMQ data element and its associated Greenplum Database table column.
The DATA:FORMAT
keyword identifies the format of the RabbitMQ message value. GPSS supports comma-delimited text format (csv
) and data that is separated by a configurable delimiter (delimited
). GPSS also supports binary (binary
), single object or single record per line JSON/JSONB (json
or jsonl
), and custom (custom
) format value data.
When you provide a META
block, you must specify a single JSON-type COLUMNS
and the FORMAT: json
.
The available RabbitMQ meta data properties for a streaming source include:
stream
(text) - the RabbitMQ stream nameoffset
(bigint) - the message offset
The available RabbitMQ meta data properties for a queue source include:
queue
(text) - the RabbitMQ queue namemessageId
(text) - the message identifiercorrelationId
(text) - the correlation identifiertimestamp
(bitint) - the time that the message was added to the RabbitMQ queue
The FILTER
parameter identifies a filter to apply to the RabbitMQ input messages before the data is loaded into Greenplum Database. If the filter evaluates to true
, GPSS loads the message. The message is dropped if the filter evaluates to false
. The filter string must be a valid SQL conditional expression and may reference one or more DATA
column names.
The ERROR_LIMIT
parameter identifies the number of errors or the error percentage threshold after which GPSS should exit the load operation. The default ERROR_LIMIT
is zero; the load operation is stopped when the first error is encountered.
RABBITMQ:OUTPUT Options
You identify the target Greenplum Database schema name and table name via the RABBITMQ:OUTPUT:
SCHEMA
and TABLE
parameters. You must pre-create the Greenplum Database table before you attempt to load RabbitMQ data.
GPSS supports loading from a RabbitMQ data source into a single Greenplum Database table only.
The default load mode is to insert RabbitMQ data into the Greenplum Database table. GPSS also supports updating and merging RabbitMQ message data into a Greenplum table. You specify the load MODE
, the MATCH_COLUMNS
and UPDATE_COLUMNS
, and any UPDATE_CONDITION
s that must be met to merge or update the data. In MERGE
MODE
, you can also specify ORDER_COLUMNS
to filter out duplicates and a DELETE_CONDITION
.
You can override the default mapping of the INPUT
DATA:COLUMNS
by specifying a MAPPING
block in which you identify the association between a specific column in the target Greenplum Database table and a RabbitMQ message value data element.
When you specify a
MAPPING
block, ensure that you provide entries for all RabbitMQ data elements of interest - GPSS does not automatically match column names when you provide aMAPPING
.
About the Merge Load Mode
MERGE
mode is similar to an UPSERT
operation; GPSS may insert new rows in the database, or may update an existing database row that satisfies match and update conditions. GPSS deletes rows in MERGE
mode when the data satisfies an optional DELETE_CONDITION
that you specify.
GPSS stages a merge operation in a temporary table, generating the SQL to populate the temp table from the set of OUTPUT
configuration properties that you provide.
GPSS uses the following algorithm for MERGE
mode processing:
- Create a temporary table like the target table.
- Generate the SQL to insert the source data into the temporary table.
- Add the
MAPPINGS
. - Add the
FILTER
. - Use
MATCH_COLUMNS
andORDER_COLUMNS
to filter out duplicates.
- Add the
- Update the target table from rows in the temporary table that satisfy
MATCH_COLUMNS
,UPDATE_COLUMNS
, andUPDATE_CONDITION
. - Insert non-matching rows into the target table.
- Delete rows in the target table that satisfy
MATCH_COLUMNS
and theDELETE_CONDITION
. - Truncate the temporary table.
Other Options
The RABBITMQ:METADATA:SCHEMA
parameter specifies the name of the Greenplum Database schema in which GPSS creates external tables.
GPSS commits RabbitMQ data to the Greenplum Database table at the row and/or time intervals that you specify in the RABBITMQ:COMMIT:
MAX_ROW
and/or MINIMAL_INTERVAL
parameters. If you do not specify these properties, GPSS commits data at the default MINIMAL_INTERVAL
, 5000ms.
Specify a RABBITMQ:PROPERTIES
block to set RabbitMQ configuration properties. GPSS sends the property names and values to RabbitMQ when it instantiates a consumer for the load operation.
About the JSON Format and Column Type
When you specify FORMAT: json
or FORMAT: jsonl
, valid COLUMN:TYPE
s for the data include json
or jsonb
. You can also specify the new GPSS gp_jsonb
(Beta) or gp_json
(Beta) column types.
gp_jsonb
is an enhanced JSONB type that adds support for\u
escape sequences and unicode. For example,gp_jsonb
can escape\uDD8B
and\u0000
as text format, butjsonb
treats these characters as illegal.gp_json
is an enhanced JSON type that can tolerate certain illegal unicode sequences. For example,gp_json
automatically escapes incorrect surrogate pairs and processes\u0000
as\\u0000
. Note that unicode escape values cannot be used for code point values above007F
when the server encoding is notUTF8
.
You can use the gp_jsonb
(Beta) and gp_json
(Beta) data types as follows:
As the
COLUMN:TYPE
when the target Greenplum Database table column type isjson
orjsonb
.In a
MAPPING
when the target Greenplum Database column istext
orvarchar
. For example:EXPRESSION: (j->>'a')::text
In a
MAPPING
whenFORMAT: avro
and the target Greenplum Database column isjson
orjsonb
. For example:EXPRESSION: j::gp_jsonb
or
EXPRESSION: j::gp_json
In a
MAPPING
whenFORMAT: avro
and the target Greenplum Database column istext
orvarchar
. For example:EXPRESSION: (j::gp_jsonb->>'a')::text
or
EXPRESSION: (j::gp_json->>'a')::text
The
gp_jsonb
(Beta) andgp_json
(Beta) data types are defined in an extension nameddataflow
. You mustCREATE EXTENSION dataflow;
in each database in which you choose to use these (Beta) data types.
Preserving Ill-Formed JSON Escape Sequences
GPSS exposes a configuration parameter that you can use with the gp_jsonb
and gp_json
types. The name of this parameter is gpss.json_preserve_ill_formed_prefix
. When set, GPSS does not return an error when it encounters an ill-formed JSON escape sequence with these types, but instead prepends it with the prefix that you specify.
For example, if gpss.json_preserve_ill_formed_prefix
is set to the string "##"
as follows:
SET gpss.json_preserve_ill_formed_prefix = "##";
and GPSS encounters an ill-formed JSON sequence such as the orphaned low surrogate \ude04X
, GPSS writes the data as ##\ude04X
instead.
About Transforming and Mapping RabbitMQ Input Data
You can define a MAPPING
between the RabbitMQ input data (DATA:COLUMNS
and META:COLUMNS
) and the columns in the target Greenplum Database table. Defining a mapping may be useful when you have a multi-field input column (such as a JSON-type column), and you want to assign individual components of the input field to specific columns in the target table.
You might also use a MAPPING
to assign a value expression to a target table column. The expression must be one that you could specify in the SELECT
list of a query, and can include a constant value, a column reference, an operator invocation, a built-in or user-defined function call, and so forth.
Creating the Greenplum Table
You must pre-create the Greenplum table before you load RabbitMQ data into Greenplum Database. You use the RABBITMQ:OUTPUT:
SCHEMA
and TABLE
load configuration file parameters to identify the schema and table names.
The target Greenplum table definition must include each column that GPSS will load into the table. The table definition may include additional columns; GPSS ignores these columns, and loads no data into them.
The name and data type that you specify for a column of the target Greenplum Database table must match the name and data type of the related, non-ignored RabbitMQ message element. If you have defined a column mapping, the name of the Greenplum Database column must match the target column name that you specified for the mapping, and the type must match the target column type or expression that you define.
The CREATE TABLE
command for the target Greenplum Database table receiving the RabbitMQ message data defined in the loadcfgrmq2.yaml
file presented in the Constructing the rabbitmq.yaml Configuration File section follows:
testdb=# CREATE TABLE public.tbl_int_text_column( c1 int8, c2 int8, path text );
About RabbitMQ Stream Offsets, Message Retention, and Loading
This topic applies only when reading from a RabbitMQ stream.
RabbitMQ assigns each record/message within a stream a unique sequential id number. This id is referred to as an offset. GPSS retains, for each gpsscli load
invocation specifying the same RabbitMQ stream and Greenplum Database table names, the last offset consumed by the load operation. Refer to Understanding RabbitMQ Message Offset Management for more detailed information about how GPSS manages RabbitMQ message offsets.
gpsscli load
returns an error if its recorded offset for the RabbitMQ stream and Greenplum Database table combination is behind that of the current earliest RabbitMQ message offset for the topic, or when the earliest and latest offsets do not match.
When you receive one of these messages, you can choose to:
Resume the load operation from the earliest available message published to the stream by specifying the
--force‑reset‑earliest
option togpsscli load
:$ gpsscli load --force-reset-earliest loadcfg2.yaml
Load only new messages published to the RabbitMQ stream, by specifying the
‑‑force‑reset‑latest
option with the command:$ gpsscli load --force-reset-latest loadcfg2.yaml
Load messages published since a specific timestamp (milliseconds since epoch), by specifying the
--force‑reset‑timestamp
option togpsscli load
:$ gpsscli load --force-reset-timestamp 1571066212000 loadcfg2.yaml
Specifying the
--force‑reset‑<xxx>
options when loading data may result in missing or duplicate messages. Use of these options outside of the offset mismatch scenario is discouraged.
Alternatively, you can provide the FALLBACK_OPTION
(version 2) or fallback_option
(version 3 (Beta)) property in the load configuration file to instruct GPSS to automatically read from the specified offset when it detects a mismatch.
Content feedback and comments