Skip to content

Commit

Permalink
Merge pull request #11 from glassflow/pipeline-token
Browse files Browse the repository at this point in the history
Pipeline access token as class parameter
  • Loading branch information
ashish-bagri authored Apr 8, 2024
2 parents 4503463 + 21e66ca commit 7fea691
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 29 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ Publish a new event into the pipeline
import glassflow

client = glassflow.GlassFlowClient()
pipeline_client = client.pipeline_client(space_id="<str value>", pipeline_id="<str value")
pipeline_client = client.pipeline_client(space_id="<str value>", pipeline_id="<str value", pipeline_access_token="<str value>")
data = {} # your json event
res = pipeline_client.publish(request_body=data, pipeline_access_token="<str token>")
res = pipeline_client.publish(request_body=data)

if res.status_code == 200:
print("Published sucessfully")
Expand All @@ -60,8 +60,8 @@ Consume the transformed event from the pipeline
import glassflow

client = glassflow.GlassFlowClient()
pipeline_client = client.pipeline_client(space_id="<str value>", pipeline_id="<str value")
res = pipeline_client.consume(pipeline_access_token="<str value>")
pipeline_client = client.pipeline_client(space_id="<str value>", pipeline_id="<str value", pipeline_access_token="<str value>")
res = pipeline_client.consume()

if res.status_code == 200:
print(res.body.event)
Expand All @@ -77,8 +77,8 @@ If the transformation failed for any event, they are available in a failed queue
import glassflow

client = glassflow.GlassFlowClient()
pipeline_client = client.pipeline_client(space_id="<str value>", pipeline_id="<str value")
res = pipeline_client.consume_failed(pipeline_access_token="<str value>")
pipeline_client = client.pipeline_client(space_id="<str value>", pipeline_id="<str value", pipeline_access_token="<str value>")
res = pipeline_client.consume_failed()

if res.status_code == 200:
print(res.body.event)
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

setuptools.setup(
name="glassflow",
version="1.0.1",
version="1.0.2",
author="glassflow",
description="GlassFlow Python Client SDK",
url="https://learn.glassflow.dev/docs",
Expand Down
8 changes: 5 additions & 3 deletions src/glassflow/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,19 @@ def __init__(self, organization_id: str = None) -> None:
self.glassflow_config = GlassFlowConfig(rclient)
self.organization_id = organization_id

def pipeline_client(self, space_id: str,
pipeline_id: str) -> PipelineClient:
def pipeline_client(self, space_id: str, pipeline_id: str,
pipeline_access_token: str) -> PipelineClient:
"""Create a new PipelineClient object to interact with a specific pipeline
Args:
space_id: The space id where the pipeline is located
pipeline_id: The pipeline id to interact with
pipeline_access_token: The access token to access the pipeline
Returns:
PipelineClient: Client object to publish and consume events from the given pipeline.
"""
return PipelineClient(glassflow_client=self,
space_id=space_id,
pipeline_id=pipeline_id)
pipeline_id=pipeline_id,
pipeline_access_token=pipeline_access_token)
30 changes: 11 additions & 19 deletions src/glassflow/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,30 @@ class PipelineClient():
space_id: The space id where the pipeline is located
pipeline_id: The pipeline id to interact with
organization_id: Organization ID of the user. If not provided, the default organization will be used
pipeline_access_token: The access token to access the pipeline
"""

def __init__(self, glassflow_client, space_id: str,
pipeline_id: str) -> None:
def __init__(self, glassflow_client, space_id: str, pipeline_id: str,
pipeline_access_token: str) -> None:
"""Create a new PipelineClient object to interact with a specific pipeline
Args:
glassflow_client: GlassFlowClient object to interact with GlassFlow API
space_id: The space id where the pipeline is located
pipeline_id: The pipeline id to interact with
pipeline_access_token: The access token to access the pipeline
"""
self.glassflow_client = glassflow_client
self.space_id = space_id
self.pipeline_id = pipeline_id
self.organization_id = self.glassflow_client.organization_id
self.pipeline_access_token = pipeline_access_token

def publish(self, request_body: dict,
pipeline_access_token: str) -> operations.PublishEventResponse:
def publish(self, request_body: dict) -> operations.PublishEventResponse:
"""Push a new message into the pipeline
Args:
request_body: The message to be published into the pipeline
pipeline_access_token: The access token to access the pipeline
Returns:
PublishEventResponse: Response object containing the status code and the raw response
Expand All @@ -45,7 +46,7 @@ def publish(self, request_body: dict,
organization_id=self.organization_id,
space_id=self.space_id,
pipeline_id=self.pipeline_id,
x_pipeline_access_token=pipeline_access_token,
x_pipeline_access_token=self.pipeline_access_token,
request_body=request_body,
)

Expand Down Expand Up @@ -101,13 +102,9 @@ def publish(self, request_body: dict,

return res

def consume(self,
pipeline_access_token: str) -> operations.ConsumeEventResponse:
def consume(self) -> operations.ConsumeEventResponse:
"""Consume the last message from the pipeline
Args:
pipeline_access_token: The access token to access the pipeline
Returns:
ConsumeEventResponse: Response object containing the status code and the raw response
Expand All @@ -119,7 +116,7 @@ def consume(self,
space_id=self.space_id,
pipeline_id=self.pipeline_id,
organization_id=self.organization_id,
x_pipeline_access_token=pipeline_access_token,
x_pipeline_access_token=self.pipeline_access_token,
)

base_url = self.glassflow_client.glassflow_config.server_url
Expand Down Expand Up @@ -174,14 +171,9 @@ def consume(self,

return res

def consume_failed(
self,
pipeline_access_token: str) -> operations.ConsumeFailedResponse:
def consume_failed(self) -> operations.ConsumeFailedResponse:
"""Consume the failed message from the pipeline
Args:
pipeline_access_token: The access token to access the pipeline
Returns:
ConsumeFailedResponse: Response object containing the status code and the raw response
Expand All @@ -193,7 +185,7 @@ def consume_failed(
space_id=self.space_id,
pipeline_id=self.pipeline_id,
organization_id=self.organization_id,
x_pipeline_access_token=pipeline_access_token,
x_pipeline_access_token=self.pipeline_access_token,
)

base_url = self.glassflow_client.glassflow_config.server_url
Expand Down

0 comments on commit 7fea691

Please sign in to comment.