Skip to content

Commit

Permalink
Readme improvement (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
mk-software-pl authored Oct 14, 2024
1 parent b3ccaf2 commit 865923e
Showing 1 changed file with 171 additions and 11 deletions.
182 changes: 171 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ volumes:
with Kafka sources. It's used to create topics and by generator to generate example messages.
- `SCHEMA_REGISTRY_ADDRESS` - it contains the address (with port) of a Schema Registry service. You will need it when you want to run
streaming examples with Kafka sources. It's used to create schemas for Kafka topics.
- `FLINK_SQL_GATEWAY_ADDRESS` - it contains the address (with port) of the [Flink SQL Gateway](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql-gateway/overview/). You will need it when you want to run batch examples. It's used to create Flink tables and insert data.

#### Used by the `designer` service

Expand Down Expand Up @@ -120,6 +121,43 @@ You can disable scenario deployment (in fact, the scenario will be deployed but
example by setting e.g. `LOAN_REQUEST_DEPLOY: false` - this ENV ensures that the `loan-request' scenario example is not
active when the data data generation is started.

### Additional outside requirements

#### Flink SQL Gateway

Currently, Bootstrapper communicates with Flink using the Flink SQL Gateway.
You can run it like this:

```bash
./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=flink-jobmanager -Drest.address=flink-jobmanager
```

<details>
<summary>SQL Gateway docker compose example </summary>

```yaml
services:
flink-jobmanager:
[...]
flink-sql-gateway:
image: flink:1.19.1-scala_2.12-java11
restart: unless-stopped
entrypoint: >
/bin/sh -c "./bin/sql-gateway.sh start-foreground -Dsql-gateway.endpoint.rest.address=flink-jobmanager -Drest.address=flink-jobmanager"
environment:
JOB_MANAGER_RPC_ADDRESS: "flink-jobmanager"
depends_on:
flink-jobmanager:
condition: service_healthy
deploy:
resources:
limits:
memory: 1024M
```
</details>

## What's underneath and how it works

### Scenario Examples Library
Expand Down Expand Up @@ -173,11 +211,14 @@ scenario-examples-library
│   │   │ │   └── {topic-01-name}.sh # script to generate message which will be sent to the topic "topic-01-name" (it will be called continuously)
│   │   │ └── static
│   │   │ └── {topic-01-name}.txt # list of messages which will be sent to topic "topic-01-name" (to send only once)
│   │ └── http
│   │ ├── generated
│   │ │   └── {open-api-service-slug}.sh # script to generate request body which will be sent with POST request to /scenarios/{open-api-service-slug} service (it will be called continuously)
│   │ ├── http
│   │ │ ├── generated
│   │ │ │   └── {open-api-service-slug}.sh # script to generate request body which will be sent with POST request to /scenarios/{open-api-service-slug} service (it will be called continuously)
│   │ │ └── static
│   │ │ └── {open-api-service-slug}.txt # list of request bodies which will be sent with POST request to /scenarios/{open-api-service-slug} service (to send only once)
| | └── flink
│   │ └── static
│   │ └── {open-api-service-slug}.txt # list of request bodies which will be sent with POST request to /scenarios/{open-api-service-slug} service (to send only once)
│   │ └── {dml-script}.sql # file with Flink SQL scripts that provides some data to test
│ ├── mocks # mock definitions (optional)
│ │   └── db
│ │   ├── {db-schema-01-name}.sql # script with DDLs to import
Expand All @@ -201,9 +242,11 @@ scenario-examples-library
│   ├── nu-designer
│   │   ├── {some-configuration-01-name}.conf # it contains part of Nu configuration (it's HOCON file)
│   │   └── {some-configuration-02-name}.conf
│   └── schema-registry
│   ├── {topic-01-name}.schema.json # it contains JSON schema definition for topic "topic-01-name"
│   └── {topic-02-name}.schema.json
│   ├── schema-registry
│   │   ├── {topic-01-name}.schema.json # it contains JSON schema definition for topic "topic-01-name"
│   │   └── {topic-02-name}.schema.json
| └── flink
│      └── {ddl-script}.sql # it contains Flink SQL script to create Flink tables
└── {scenario-example-2} # the next scenario
   ├── [...]
```
Expand All @@ -213,6 +256,11 @@ scenario-examples-library
It's a representation of a Scenario in form of JSON. Nu Designer should be able to import it. The name of the file with the scenario
is going to be used by the Bootstrapper during empty scenario creation. It should be unique.

>❗ Bootstrapper is able to determine the scenario processing mode based on the scenario metadata from the JSON in case of a Streaming and
Request-Response scenario. But it cannot distinguish between a Streaming and Batch scenario. So, to give the Bootstrapper a hint about
processing mode, you can add `batch` part at the end of JSON file name (eg. `my-example-batch.json`). You can do the same in case of
a Streaming (by adding `streaming`) and Request-Response (by adding `request-response`), but it's not required.

### Scenario setup

#### Nu Designer configuration
Expand All @@ -239,6 +287,65 @@ OfferProposalsBasedOnCustomerEvents
For each defined topic you should provide a JSON schema. You add schemas in the `{scenario-name}/setup/schema-registry` folder. Name format
for a schema file is `{topic-01-name}.schema.json`. It means that the schema will be added for a topic `{topic-01-name}.

#### Flink tables

To create Flink tables you can use Flink SQL scripts places in the `{scenario-name}/setup/flink` folder. Each script has to have the `*.sql` extension.

<details>
<summary>Example</summary>

`tables.sql` file:
```sql
CREATE CATALOG nessie_catalog WITH (
'type'='iceberg',
'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'uri'='http://nessie:19120/api/v2',
'warehouse' = 's3://warehouse',
's3.endpoint'='http://minio:9000'
);
USE CATALOG nessie_catalog;
CREATE DATABASE IF NOT EXISTS `default`;

CREATE TABLE IF NOT EXISTS orders
(
order_timestamp TIMESTAMP,
order_date STRING,
id BIGINT,
customer_id BIGINT
) PARTITIONED BY (order_date);

CREATE TABLE IF NOT EXISTS products
(
id BIGINT,
name STRING,
product_category TINYINT,
current_prise DECIMAL(15, 2)
);

CREATE TABLE IF NOT EXISTS order_items
(
id BIGINT,
order_id BIGINT,
product_id BIGINT,
quantity BIGINT,
unit_prise DECIMAL(15, 2)
);

CREATE TABLE IF NOT EXISTS product_orders_report
(
report_id BIGINT,
report_timestamp TIMESTAMP,
report_year_month STRING,
product_id BIGINT,
product_quantity_sum BIGINT
) PARTITIONED BY (report_year_month);
```
</details>


### Scenario external services mocks

Some scenarios can use components that call external services like database or Open API HTTP service. In this case you need to provide
Expand Down Expand Up @@ -372,10 +479,9 @@ A script should echo a string (e.g. stringified JSON).

##### Static data

Static HTTP requests (payloads) are provided with text file placed in the `{scenario-name}/data/http/static` folder. The URL,
the request generated by the script will be sent to, consists of a static path and a dynamic part taken from the name of the file
(e.g. `loan.txt` contains requests that will be sent to `http://$NU_REQUEST_RESPONSE_OPEN_API_SERVICE_ADDRESS/scenario/loan`).
The file contains request payload per line.
Static HTTP requests (payloads) are provided with text file placed in the `{scenario-name}/data/http/static` folder. The URL
consists of a static path and a dynamic part taken from the name of the file (e.g. `loan.txt` contains requests that will be
sent to `http://$NU_REQUEST_RESPONSE_OPEN_API_SERVICE_ADDRESS/scenario/loan`). The file contains request payload per line.

<details>
<summary>Example</summary>
Expand All @@ -391,3 +497,57 @@ The file contains request payload per line.
{"customerId": "3", "requestedAmount": 2000, "requestType": "loan", "location": { "city": "Lublin", "street": "Głęboka" }}
```
</details>

#### Batch scenario

##### Dynamic data

Not supported yet.

##### Static data

We can upload static data to Flink tables by providing Flink SQL DML scripts in the `{scenario-name}/data/flink/static` folder. Each
file with scripts has the `*.sql` extension.

<details>
<summary>Example</summary>

`inserts.sql` file:
```sql
CREATE CATALOG nessie_catalog WITH (
'type'='iceberg',
'catalog-impl'='org.apache.iceberg.nessie.NessieCatalog',
'io-impl'='org.apache.iceberg.aws.s3.S3FileIO',
'uri'='http://nessie:19120/api/v2',
'warehouse' = 's3://warehouse',
's3.endpoint'='http://minio:9000'
);

USE CATALOG nessie_catalog;

INSERT INTO products (id, name, product_category, current_prise)
VALUES (1, 'Basic Widget', 0, 9.99),
(2, 'Premium Widget', 1, 19.99),
(3, 'Basic Gadget', 0, 14.99),
(4, 'Premium Gadget', 1, 29.99),
(5, 'Basic Tool', 0, 24.99),
(6, 'Premium Tool', 1, 49.99);

INSERT INTO orders (order_timestamp, order_date, id, customer_id)
VALUES (TO_TIMESTAMP('2024-09-12 14:25:30'), '2024-09-12', 1001, 2001),
(TO_TIMESTAMP('2024-09-12 15:10:45'), '2024-09-12', 1002, 2002),
(TO_TIMESTAMP('2024-09-13 09:20:10'), '2024-09-13', 1003, 2003),
(TO_TIMESTAMP('2024-09-13 10:30:05'), '2024-09-13', 1004, 2004),
(TO_TIMESTAMP('2024-09-13 12:15:22'), '2024-09-13', 1005, 2001),
(TO_TIMESTAMP('2024-09-14 14:50:00'), '2024-09-14', 1006, 2002);

INSERT INTO order_items (id, order_id, product_id, quantity, unit_prise)
VALUES (10001, 1001, 1, 2, 9.99), -- Order 1001 contains 2 Basic Widgets
(10002, 1001, 3, 1, 14.99), -- Order 1001 also contains 1 Basic Gadget
(10003, 1002, 2, 1, 19.99), -- Order 1002 contains 1 Premium Widget
(10004, 1003, 3, 3, 14.99), -- Order 1003 contains 3 Basic Gadgets
(10005, 1004, 4, 1, 29.99), -- Order 1004 contains 1 Premium Gadget
(10006, 1005, 5, 4, 24.99), -- Order 1005 contains 4 Basic Tools
(10007, 1006, 6, 2, 49.99);
```
</details>

0 comments on commit 865923e

Please sign in to comment.