diff --git a/transfer/streaming/streaming-01-http-to-http/README.md b/transfer/streaming/streaming-01-http-to-http/README.md index 9beee036..203e62d8 100644 --- a/transfer/streaming/streaming-01-http-to-http/README.md +++ b/transfer/streaming/streaming-01-http-to-http/README.md @@ -1,70 +1,92 @@ # Streaming HTTP to HTTP -This sample will show how you can set up the EDC to stream messages from HTTP to HTTP. +This sample will show how you can set up the Eclipse Dataspace Connector to stream messages from HTTP to HTTP. This code is only for demonstration purposes and should not be used in production. ## Concept -We will build a data-plane `DataSource` extension that will retrieve new data from a disk folder and push it + +We will build a `Dataplane DataSource` extension that will retrieve new data from a disk folder and push it to every consumer that has started a `TransferProcess` for a related asset. ### Run Build the connector runtime, which will be used both for the provider and consumer: + ```shell ./gradlew :transfer:streaming:streaming-01-http-to-http:streaming-01-runtime:build ``` -Run the provider and the consumer, which must be started from different terminal shells: +Run the provider and the consumer with their own configuration, which will need to be started from different terminals: + ```shell -# provider export EDC_FS_CONFIG=transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/provider.properties java -jar transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build/libs/connector.jar +``` -#consumer +```shell export EDC_FS_CONFIG=transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/consumer.properties java -jar transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/build/libs/connector.jar ``` -#### Register Data Plane on provider -The provider connector needs to be aware of the streaming capabilities of the embedded dataplane, which can be registered with -this call: -```js -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" +#### Register Dataplane on provider + +The provider connector needs to be aware of the streaming capabilities of the embedded dataplane, which can be +registered with this call: + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" -s | jq ``` -If you look at the `dataplane.json` you'll notice that the supported source is `HttpStreaming` and the supported sink is `HttpData`. +If you look at the [dataplane.json](dataplane.json) you'll notice that the supported source is `HttpStreaming` and the +supported sink is `HttpData`. #### Register Asset, Policy Definition and Contract Definition on provider -A "source" folder must first be created where the data plane will get the messages to be sent to the consumers. -To do this, create a temp folder: + +A "source" folder must first be created where the data plane will get the messages to be sent to the consumers. To do +this, create a temporary folder: + ```shell mkdir /tmp/source ``` Then put the path in the [asset.json](asset.json) file replacing the `{{sourceFolder}}` placeholder. + ```json +{ "dataAddress": { "type": "HttpStreaming", - "sourceFolder": "{{sourceFolder}}" + "sourceFolder": "/tmp/source" } +} +``` + +Then use these three calls to create the Asset, the Policy Definition and the Contract Definition: + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/asset.json -X POST "http://localhost:18181/management/v3/assets" -s | jq ``` -Then create the Asset, the Policy Definition and the Contract Definition with these three calls: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/asset.json -X POST "http://localhost:18181/management/v3/assets" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -s | jq +``` + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" -s | jq ``` #### Negotiate the contract -The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a contract. -However, in this sample case, we already have the provider asset (`"stream-asset"`) so we can get the related dataset + +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a +contract. +However, in this sample case, we already have the provider asset `stream-asset` so we can get the related dataset directly with this call: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq ``` The output will be something like: + ```json { "@id": "stream-asset", @@ -97,54 +119,64 @@ The output will be something like: With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](negotiate-contract.json) file and request the contract negotiation: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq ``` ### Start the transfer -First we need to set up the receiver server on the consumer side that will receive a call for every message. For this -you'll need to open another terminal shell and run: + +First we need to set up the logging webserver on the consumer side, which will receive a call for each transfer. For +this you'll need to open another terminal and run: + ```shell -./gradlew util:http-request-logger:build -HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar +docker build -t http-request-logger util/http-request-logger +docker run -p 4000:4000 http-request-logger ``` + It will run on port 4000. -At this point the contract agreement should already been issued, to verify that, please check the contract negotiation state with -this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation +state with this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. + ```shell curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negotiation-id}}" -s | jq ``` -If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [transfer.json](transfer.json) -file to `{{contract-agreement-id}}` and then calling the connector with this command: +If the `edc:contractAgreementId` has a value, it can be used to start the transfer, which will be replaced in +the [transfer.json](transfer.json) file to `{{contract-agreement-id}}` and then calling the connector with this command: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-01-http-to-http/transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq ``` -> Note that the destination address is `localhost:4000`, this because is where our http server is listening. + +> Note that the destination address is `localhost:4000`, this because is where our logging webserver is listening. -Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned -by the start transfer call: +Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the +id returned by the start transfer call: + ```shell curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process-id}}" -s | jq ``` -Here we can test the transfer creating a file into the `source` folder that we configured before, e.g. copying the `README.md` -into the `source` folder: +Here we can test the transfer creating a file into the `source` folder that we configured before, e.g. copying +the `README.md` into the `source` folder: + ```shell cp README.md /tmp/source ``` -we should see the content logged into the received server: +we should see the content logged to the received logging webserver: + ``` Incoming request Method: POST Path: / Body: -# EDC Samples -... + ``` + ### Up to you: second connector As a challenge, try starting another consumer connector, negotiating a contract, and starting the transfer. @@ -152,4 +184,5 @@ Every message pushed by the provider will be sent to all the consumers. ## Technical insight -The required code is contained in the [`streaming-01-runtime` source folder](transfer/streaming/streaming-01-http-to-http/streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http). +The required code can be found in the source folder of +the [streaming-01-runtime](streaming-01-runtime/src/main/java/org/eclipse/edc/samples/transfer/streaming/http). diff --git a/transfer/streaming/streaming-02-kafka-to-http/1-asset.json b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json index 675614c9..a4b7d626 100644 --- a/transfer/streaming/streaming-02-kafka-to-http/1-asset.json +++ b/transfer/streaming/streaming-02-kafka-to-http/1-asset.json @@ -6,7 +6,6 @@ "dataAddress": { "type": "Kafka", "kafka.bootstrap.servers": "{{bootstrap.servers}}", - "maxDuration": "{{max.duration}}", "topic": "{{topic}}" } } diff --git a/transfer/streaming/streaming-02-kafka-to-http/README.md b/transfer/streaming/streaming-02-kafka-to-http/README.md index aeac12c7..d6a06907 100644 --- a/transfer/streaming/streaming-02-kafka-to-http/README.md +++ b/transfer/streaming/streaming-02-kafka-to-http/README.md @@ -1,76 +1,93 @@ -# Streaming KAFKA to HTTP +# Streaming Kafka to HTTP -This sample demonstrates how to set up the EDC to stream messages from Kafka to HTTP. +This sample demonstrates how to set up the Eclipse Dataspace Connector to stream messages from Kafka to HTTP. This code is only for demonstration purposes and should not be used in production. ## Concept -We will use the data-plane kafka `DataSource` extension that will pull event records from a kafka topic and push it +We will use the `Dataplane Kafka DataSource` extension, which pulls event records from a Kafka topic and pushes them to every consumer that has started a `TransferProcess` for a related asset. ### Run Build the connector runtime, which will be used both for the provider and consumer: + ```shell ./gradlew :transfer:streaming:streaming-02-kafka-to-http:streaming-02-runtime:build ``` -Run the provider and the consumer, which must be started from different terminal shells: +Run the provider and the consumer with their own configuration, which will need to be started from different terminals: + ```shell -# provider export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/provider.properties java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar +``` -#consumer +```shell export EDC_FS_CONFIG=transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/consumer.properties java -jar transfer/streaming/streaming-02-kafka-to-http/streaming-02-runtime/build/libs/connector.jar ``` -### Register Data Plane on provider +### Register Dataplane on provider + +The provider connector needs to be aware of the Kafka streaming capabilities of the embedded dataplane, which can be +registered with this call: -The provider connector needs to be aware of the kafka streaming capabilities of the embedded dataplane, which can be registered with -this call: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/0-dataplane.json -X POST "http://localhost:18181/management/v2/dataplanes" -s | jq ``` -If you look at the `0-dataplane.json` you'll notice that the supported source is `Kafka` and the supported sink is `HttpData`. +If you look at the `0-dataplane.json` you'll notice that the supported source is `Kafka` and the supported sink +is `HttpData`. ### Register Asset, Policy Definition and Contract Definition on provider -A "source" kafka topic must first be created where the data plane will get the event records to be sent to the consumers. -To do this, initiate a Kafka server with the source topic: +A "source" Kafka topic must first be created where the data plane will get the event records to be sent to the +consumers. To do this, initiate a Kafka server with the source topic: + ```shell -docker run -e "KAFKA_CREATE_TOPICS={{topic}}:1:1" -p 9092:9092 -d bashj79/kafka-kraft:3.0.0 +docker run --rm --name=kafka-kraft -e "KAFKA_CREATE_TOPICS={{topic}}:1:1" -p 9092:9092 -d bashj79/kafka-kraft:3.0.0 ``` -Then put values of `kafka.bootstrap.servers`, `maxDuration` and `topic` in the [1-asset.json](1-asset.json) file replacing their placeholders. +Then put values of `kafka.bootstrap.servers` and `topic` in the [1-asset.json](1-asset.json) file replacing their +placeholders. + ```json +{ "dataAddress": { "type": "Kafka", - "kafka.bootstrap.servers": "{{bootstrap.servers}}", - "maxDuration": "{{max.duration}}" - "topic": "{{topic}}" + "kafka.bootstrap.servers": "localhost:9092", + "topic": "kafka-stream-topic" } +} ``` -Then create the Asset, the Policy Definition and the Contract Definition with these three calls: +Then use these three calls to create the Asset, the Policy Definition and the Contract Definition: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -s | jq +``` + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -s | jq +``` + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" -s | jq ``` ### Negotiate the contract -The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a contract. -However, in this sample case, we already have the provider asset (`"kafka-stream-asset"`) so we can get the related dataset -directly with this call: +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a +contract. However, in this sample case, we already have the provider asset `kafka-stream-asset` so we can get the +related dataset directly with this call: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq ``` The output will be something like: + ```json { "@id": "kafka-stream-asset", @@ -103,46 +120,59 @@ The output will be something like: With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](5-negotiate-contract.json) file and request the contract negotiation: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/5-negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq ``` ### Start the transfer -First we need to set up the receiver server on the consumer side that will receive a call for every new event. For this -you'll need to open another terminal shell and run: +First we need to set up the logging webserver on the consumer side that will receive a call for every new event. For +this you'll need to open another terminal and run: + ```shell -./gradlew util:http-request-logger:build -HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar +docker build -t http-request-logger util/http-request-logger +docker run -p 4000:4000 http-request-logger ``` + It will run on port 4000. -At this point the contract agreement should already been issued, to verify that, please check the contract negotiation state with -this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation +state with this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. + ```shell curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negotiation-id}}" -s | jq ``` -If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [6-transfer.json](6-transfer.json) +If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in +the [transfer.json](6-transfer.json) file to `{{contract-agreement-id}}` and then calling the connector with this command: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-02-kafka-to-http/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq ``` -> Note that the destination address is `localhost:4000`, this because is where our http server is listening. -Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned -by the start transfer call: +> Note that the destination address is `localhost:4000`, this because is where our logging webserver is listening. + +Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the +id returned by the start transfer call: + ```shell curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process-id}}" -s | jq ``` ### Produce events -With the Kafka server running in Docker, you can use the Kafka command-line producer `kafka-console-producer.sh` to produce a message. In a new terminal shell, you'll need to execute: +With the Kafka server running in Docker, you can use the Kafka command-line producer `kafka-console-producer.sh` to +produce a message. In a new terminal shell, you'll need to execute: + ```shell -docker exec -it {{docker-container-id}} /opt/kafka/bin/kafka-console-producer.sh --topic kafka-stream-topic --bootstrap-server localhost:9092 +docker exec -it kafka-kraft /opt/kafka/bin/kafka-console-producer.sh --topic kafka-stream-topic --bootstrap-server localhost:9092 ``` -This command will open an interactive prompt for you to input your message. Once you've typed your message and pressed Enter, it will be produced, consumed and pushed to the receiver server. You should observe the content being logged on its terminal shell: + +This command will open an interactive prompt for you to input your message. Once you've typed your message and pressed +Enter, it will be produced, consumed and pushed to the receiver server. You should observe the content being logged on +its terminal: ``` Incoming request @@ -150,5 +180,4 @@ Method: POST Path: / Body: -... ``` \ No newline at end of file diff --git a/transfer/streaming/streaming-03-kafka-broker/README.md b/transfer/streaming/streaming-03-kafka-broker/README.md index 046c6c85..bdbcdd52 100644 --- a/transfer/streaming/streaming-03-kafka-broker/README.md +++ b/transfer/streaming/streaming-03-kafka-broker/README.md @@ -1,42 +1,47 @@ -# Streaming KAFKA to KAFKA +# Streaming Kafka to Kafka -This sample demonstrates how to set up the EDC to stream messages through Kafka. +This sample demonstrates how to set up the Eclipse Dataspace Connector to stream messages through Kafka. This code is only for demonstration purposes and should not be used in production. ## Concept -In this sample the Data-Plane is not used, the consumer will set up a kafka client to poll the messages from the broker -using some credentials obtained from the transfer process. +In this sample the dataplane is not used, the consumer connector will set up a Kafka client to poll the messages from +the broker using some credentials obtained from the transfer process. -The DataFlow is managed by the [KafkaToKafkaDataFlowController](streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java), -that on flow initialization creates an `EndpointDataReference` containing the credentials that the consumer would then use -to poll the messages. +The Dataflow is managed by +the [KafkaToKafkaDataFlowController](streaming-03-runtime/src/main/java/org/eclipse/edc/samples/streaming/KafkaToKafkaDataFlowController.java), +that on flow initialization creates an `EndpointDataReference` containing the credentials that the consumer would then +use to poll the messages. ### Run Build the connector runtime, which will be used both for the provider and consumer: + ```shell ./gradlew :transfer:streaming:streaming-03-kafka-broker:streaming-03-runtime:build ``` -Run the provider and the consumer, which must be started from different terminal shells: +Run the provider and the consumer with their own configuration, which will need to be started from different terminals: + ```shell -# provider export EDC_FS_CONFIG=transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/provider.properties java -jar transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build/libs/connector.jar +``` -#consumer +```shell export EDC_FS_CONFIG=transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/consumer.properties java -jar transfer/streaming/streaming-03-kafka-broker/streaming-03-runtime/build/libs/connector.jar ``` ### Start Kafka and configure ACLs -Kafka will be started in [KRaft mode](https://developer.confluent.io/learn/kraft/), a single broker with `SASL_PLAINTEXT` -as security protocol ([see config](kafka.env)), there will be an `admin` user, responsible for setting up ACLs and producing -messages, and `alice`, that will be used by the consumer to consume the messages. +Kafka will be started in [KRaft mode](https://developer.confluent.io/learn/kraft/), a single broker +with `SASL_PLAINTEXT` +as security protocol ([see config](kafka.env)), there will be an `admin` user, responsible for setting up ACLs and +producing messages, and `alice`, that will be used by the consumer to consume the messages. Run the Kafka container: + ```shell docker run --rm --name=kafka-kraft -h kafka-kraft -p 9093:9093 \ -v "$PWD/transfer/streaming/streaming-03-kafka-broker/kafka-config":/config \ @@ -56,6 +61,7 @@ docker run --rm --name=kafka-kraft -h kafka-kraft -p 9093:9093 \ ``` Create the topic `kafka-stream-topic` + ```shell docker exec -it kafka-kraft /bin/kafka-topics \ --topic kafka-stream-topic --create --partitions 1 --replication-factor 1 \ @@ -64,6 +70,7 @@ docker exec -it kafka-kraft /bin/kafka-topics \ ``` To give `alice` read permissions on the topic we need to set up ACLs: + ```shell docker exec -it kafka-kraft /bin/kafka-acls --command-config /config/admin.properties \ --bootstrap-server localhost:9093 \ @@ -75,33 +82,45 @@ docker exec -it kafka-kraft /bin/kafka-acls --command-config /config/admin.prope ### Register Asset, Policy Definition and Contract Definition on provider -Then put values of `kafka.bootstrap.servers`, `maxDuration` and `topic` in the [1-asset.json](1-asset.json) file replacing -their placeholders this way: +Then put values of `kafka.bootstrap.servers` and `topic` in the [1-asset.json](1-asset.json) file +replacing their placeholders this way: + ```json +{ "dataAddress": { "type": "Kafka", "kafka.bootstrap.servers": "localhost:9093", "topic": "kafka-stream-topic" } +} +``` + +Then use these three calls to create the Asset, the Policy Definition and the Contract Definition: + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -s | jq ``` -Then create the Asset, the Policy Definition and the Contract Definition with these three calls: ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/1-asset.json -X POST "http://localhost:18181/management/v3/assets" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/2-policy-definition.json -X POST "http://localhost:18181/management/v2/policydefinitions" -s | jq +``` + +```shell +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/3-contract-definition.json -X POST "http://localhost:18181/management/v2/contractdefinitions" -s | jq ``` ### Negotiate the contract -The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a contract. -However, in this sample case, we already have the provider asset (`"kafka-stream-asset"`) so we can get the related dataset -directly with this call: +The typical flow requires fetching the catalog from the consumer side and using the contract offer to negotiate a +contract. However, in this sample case, we already have the provider asset `kafka-stream-asset` so we can get the +related dataset directly with this call: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/4-get-dataset.json -X POST "http://localhost:28181/management/v2/catalog/dataset/request" -s | jq ``` The output will be something like: + ```json { "@id": "kafka-stream-asset", @@ -114,13 +133,7 @@ The output will be something like: "odrl:obligation": [], "odrl:target": "kafka-stream-asset" }, - "dcat:distribution": { - "@type": "dcat:Distribution", - "dct:format": { - "@id": "HttpData" - }, - "dcat:accessService": "b24dfdbc-d17f-4d6e-9b5c-8fa71dacecfc" - }, + "dcat:distribution": [], "edc:id": "kafka-stream-asset", "@context": { "dct": "https://purl.org/dc/terms/", @@ -134,8 +147,9 @@ The output will be something like: With the `odrl:hasPolicy/@id` we can now replace it in the [negotiate-contract.json](5-negotiate-contract.json) file and negotiate the contract: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/5-negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/5-negotiate-contract.json -X POST "http://localhost:28181/management/v2/contractnegotiations" -s | jq ``` ### Start the transfer @@ -143,47 +157,57 @@ curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kaf First we need to set up the receiver server on the consumer side that will receive the EndpointDataReference containing the address and credentials to connect to the broker and poll the messages from the topic. For this you'll need to open another terminal shell and run: + ```shell -./gradlew util:http-request-logger:build -HTTP_SERVER_PORT=4000 java -jar util/http-request-logger/build/libs/http-request-logger.jar +docker build -t http-request-logger util/http-request-logger +docker run -p 4000:4000 http-request-logger ``` + It will run on port 4000. -At this point the contract agreement should already been issued, to verify that, please check the contract negotiation state with -this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. +At this point the contract agreement should already been issued, to verify that, please check the contract negotiation +state with this call, replacing `{{contract-negotiation-id}}` with the id returned by the negotiate contract call. + ```shell curl "http://localhost:28181/management/v2/contractnegotiations/{{contract-negotiation-id}}" -s | jq ``` -If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in the [6-transfer.json](6-transfer.json) +If the `edc:contractAgreementId` is valued, it can be used to start the transfer, replacing it in +the [6-transfer.json](6-transfer.json) file to `{{contract-agreement-id}}` and then calling the connector with this command: + ```shell -curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq +curl -H 'Content-Type: application/json' -d @transfer/streaming/streaming-03-kafka-broker/6-transfer.json -X POST "http://localhost:28181/management/v2/transferprocesses" -s | jq ``` -> Note that the destination address is `localhost:4000`, this because is where our http server is listening. -Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the id returned -by the start transfer call: +> Note that the destination address is `localhost:4000`, this because is where our logging webserver is listening. + +Let's wait until the transfer state is `STARTED` state executing this call, replacing to `{{transfer-process-id}}` the +id returned by the start transfer call: + ```shell curl "http://localhost:28181/management/v2/transferprocesses/{{transfer-process-id}}" -s | jq ``` ### Consume events + Now in the console of the `http-request-logger` we started before, the `EndpointDataReference` should have appeared: + ```json { - "id":"8c52a781-2588-4c9b-8c70-4e5ad428eea9", - "endpoint":"localhost:9093", - "authKey":"alice", - "authCode":"alice-secret", + "id": "8c52a781-2588-4c9b-8c70-4e5ad428eea9", + "endpoint": "localhost:9093", + "authKey": "alice", + "authCode": "alice-secret", "properties": { - "https://w3id.org/edc/v0.0.1/ns/topic":"kafka-stream-topic" + "https://w3id.org/edc/v0.0.1/ns/topic": "kafka-stream-topic" } } ``` Using these information on the consumer side we can run a `kafka-console-consumer` with the data received to consume messages from the topic: + ```shell docker exec -it kafka-kraft /bin/kafka-console-consumer --topic kafka-stream-topic \ --bootstrap-server localhost:9093 \ @@ -196,6 +220,7 @@ docker exec -it kafka-kraft /bin/kafka-console-consumer --topic kafka-stream-top ### Produce events In another shell we can put ourselves in the provider shoes and create messages from the producer shell: + ```shell docker exec -it kafka-kraft /bin/kafka-console-producer --topic kafka-stream-topic \ --producer.config=/config/admin.properties \