Skip to content

Commit

Permalink
Merge pull request #42 from Veeresh-dot-Pattar/master
Browse files Browse the repository at this point in the history
Added changes to support python3
  • Loading branch information
deepamartin authored Aug 6, 2021
2 parents e5d3570 + 52ce49e commit 9904de6
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
30 changes: 23 additions & 7 deletions etl/output/pushtoES.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
import base64
from Crypto.Cipher import AES
from urlparse import urlparse
import ast
from elasticsearch import helpers
from elasticsearch import Elasticsearch
import json
import os
import sys
import yaml
from scriptConst import Constants
import requests

if sys.version_info[0] < 3:
from urlparse import urlparse
from scriptConst import Constants
else:
from urllib.parse import urlparse
from .scriptConst import Constants




def iterateDict(data):
Expand All @@ -35,6 +41,7 @@ def unpad(raw_data):
enc = base64.b64decode(enc)
iv = enc[:16]
cipher = AES.new(Constants.decrypt_key, AES.MODE_CBC, iv)

return unpad(cipher.decrypt( enc[16:] ))

def prepare_data( target, data):
Expand Down Expand Up @@ -65,7 +72,7 @@ def get_es_client( target):
scheme = config.get(Constants.PROTOCOL)
username = config.get(Constants.USERNAME)
password = config.get(Constants.PASSWORD)
if password:
if password and sys.version_info[0] < 3:
password = base64.b64decode(password)

if username and password:
Expand All @@ -89,12 +96,16 @@ def get_es_client( target):
def write_docs_bulk(config,data):

res1 = get_index_conf(decrypt(config["key"]),config["tags"]["projectName"],'metric' )
targets = [iterateDict(res1)]
if sys.version_info[0] < 3:
targets = [iterateDict(res1)]
else:
targets = [res1]

MAXRETRY = 2
TIMEOUT = 10
# response_output = []
for target in targets:

# target["config"]["password"] = "2330996677315"
# target["config"]["username"] = "apmuser"
cfg = target.get(Constants.CONFIG ,{})
Expand Down Expand Up @@ -152,7 +163,7 @@ def send_docs_to_kafka_rest_proxy(target,data):
print("Status Code in kafka code: %s" , str(r.status_code))
# logger.error("Status Code: %s" , str(r.status_code))
# logger.error(r.text)

except Exception as e:
print("Exception kafka post: %s" , e)
# logger.error(e)
Expand All @@ -174,7 +185,12 @@ def get_index_conf(conf,project_name = "", plugin_type = ""):
# name = Constants.CONTROL + "-" + profile_id
name = plugin_type + "-" + profile_id
if conf.get(Constants.PASSWORD):
conf[Constants.PASSWORD] = base64.b64encode(conf[Constants.PASSWORD])

#conf[Constants.PASSWORD] = (base64.b64encode((conf[Constants.PASSWORD]).encode("utf-8"))).decode("utf-8")
if sys.version_info[0] < 3:
conf[Constants.PASSWORD] = base64.b64encode(conf[Constants.PASSWORD])
else:
conf[Constants.PASSWORD] = conf[Constants.PASSWORD]
if tar_type == Constants.ELASTICSEARCH:
es_conf[Constants.TYPE] = Constants.ELASTICSEARCH
if plugin_type:
Expand Down
2 changes: 2 additions & 0 deletions etl/output/scriptConst.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,5 @@ class Constants:
TYPE = "type"
USERNAME = "username"
decrypt_key = "SnappyFlow123456"
TOKEN = 'token'

0 comments on commit 9904de6

Please sign in to comment.