diff --git a/addons/falcon-bridge-shim/pom.xml b/addons/falcon-bridge-shim/pom.xml index 0e509d0f1b..480d325de2 100755 --- a/addons/falcon-bridge-shim/pom.xml +++ b/addons/falcon-bridge-shim/pom.xml @@ -32,6 +32,10 @@ Apache Atlas Falcon Bridge Shim Apache Atlas Falcon Bridge Shim Module + + true + false + diff --git a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java index 2b756de0e2..7b464ab33f 100755 --- a/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java +++ b/addons/falcon-bridge-shim/src/main/java/org/apache/atlas/falcon/service/AtlasService.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,7 +18,6 @@ package org.apache.atlas.falcon.service; - import org.apache.atlas.plugin.classloader.AtlasPluginClassLoader; import org.apache.falcon.FalconException; import org.apache.falcon.entity.store.ConfigurationStore; @@ -34,12 +33,12 @@ public class AtlasService implements FalconService, ConfigurationChangeListener { private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class); - private static final String ATLAS_PLUGIN_TYPE = "falcon"; + private static final String ATLAS_PLUGIN_TYPE = "falcon"; private static final String ATLAS_FALCON_HOOK_IMPL_CLASSNAME = "org.apache.atlas.falcon.service.AtlasService"; - private AtlasPluginClassLoader atlasPluginClassLoader = null; - private FalconService falconServiceImpl = null; - private ConfigurationChangeListener configChangeListenerImpl = null; + private AtlasPluginClassLoader atlasPluginClassLoader; + private FalconService falconServiceImpl; + private ConfigurationChangeListener configChangeListenerImpl; public AtlasService() { this.initialize(); @@ -47,9 +46,7 @@ public AtlasService() { @Override public String getName() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.getName()"); - } + LOG.debug("==> AtlasService.getName()"); String ret = null; @@ -60,18 +57,14 @@ public String getName() { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.getName()"); - } + LOG.debug("<== AtlasService.getName()"); return ret; } @Override public void init() throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.init()"); - } + LOG.debug("==> AtlasService.init()"); try { activatePluginClassLoader(); @@ -83,16 +76,12 @@ public void init() throws FalconException { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.init()"); - } + LOG.debug("<== AtlasService.init()"); } @Override public void destroy() throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.destroy()"); - } + LOG.debug("==> AtlasService.destroy()"); try { activatePluginClassLoader(); @@ -104,16 +93,12 @@ public void destroy() throws FalconException { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.destroy()"); - } + LOG.debug("<== AtlasService.destroy()"); } @Override public void onAdd(Entity entity) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onAdd({})", entity); - } + LOG.debug("==> AtlasService.onAdd({})", entity); try { activatePluginClassLoader(); @@ -122,16 +107,12 @@ public void onAdd(Entity entity) throws FalconException { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onAdd({})", entity); - } + LOG.debug("<== AtlasService.onAdd({})", entity); } @Override public void onRemove(Entity entity) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onRemove({})", entity); - } + LOG.debug("==> AtlasService.onRemove({})", entity); try { activatePluginClassLoader(); @@ -140,16 +121,12 @@ public void onRemove(Entity entity) throws FalconException { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onRemove({})", entity); - } + LOG.debug("<== AtlasService.onRemove({})", entity); } @Override public void onChange(Entity entity, Entity entity1) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1); - } + LOG.debug("==> AtlasService.onChange({}, {})", entity, entity1); try { activatePluginClassLoader(); @@ -158,16 +135,12 @@ public void onChange(Entity entity, Entity entity1) throws FalconException { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1); - } + LOG.debug("<== AtlasService.onChange({}, {})", entity, entity1); } @Override public void onReload(Entity entity) throws FalconException { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.onReload({})", entity); - } + LOG.debug("==> AtlasService.onReload({})", entity); try { activatePluginClassLoader(); @@ -176,15 +149,11 @@ public void onReload(Entity entity) throws FalconException { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.onReload({})", entity); - } + LOG.debug("<== AtlasService.onReload({})", entity); } private void initialize() { - if (LOG.isDebugEnabled()) { - LOG.debug("==> AtlasService.initialize()"); - } + LOG.debug("==> AtlasService.initialize()"); try { atlasPluginClassLoader = AtlasPluginClassLoader.getInstance(ATLAS_PLUGIN_TYPE, this.getClass()); @@ -195,7 +164,7 @@ private void initialize() { Object atlasService = cls.newInstance(); - falconServiceImpl = (FalconService) atlasService; + falconServiceImpl = (FalconService) atlasService; configChangeListenerImpl = (ConfigurationChangeListener) atlasService; } catch (Exception excp) { LOG.error("Error instantiating Atlas hook implementation", excp); @@ -203,9 +172,7 @@ private void initialize() { deactivatePluginClassLoader(); } - if (LOG.isDebugEnabled()) { - LOG.debug("<== AtlasService.initialize()"); - } + LOG.debug("<== AtlasService.initialize()"); } private void activatePluginClassLoader() { diff --git a/addons/falcon-bridge/pom.xml b/addons/falcon-bridge/pom.xml index 50c014f6c6..7dfe6c55af 100644 --- a/addons/falcon-bridge/pom.xml +++ b/addons/falcon-bridge/pom.xml @@ -32,6 +32,11 @@ Apache Atlas Falcon Bridge Apache Atlas Falcon Bridge Module + + true + false + + org.apache.atlas diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java index cbf002f4fa..f21dd17c3b 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/bridge/FalconBridge.java @@ -20,8 +20,8 @@ import org.apache.atlas.AtlasClient; import org.apache.atlas.AtlasConstants; -import org.apache.atlas.falcon.Util.EventUtil; import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.falcon.util.EventUtil; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.v1.model.instance.Referenceable; @@ -60,15 +60,18 @@ public class FalconBridge { private static final Logger LOG = LoggerFactory.getLogger(FalconBridge.class); - public static final String COLO = "colo"; - public static final String TAGS = "tags"; - public static final String GROUPS = "groups"; - public static final String PIPELINES = "pipelines"; - public static final String WFPROPERTIES = "workflow-properties"; - public static final String RUNSON = "runs-on"; - public static final String STOREDIN = "stored-in"; - public static final String FREQUENCY = "frequency"; - public static final String ATTRIBUTE_DB = "db"; + public static final String COLO = "colo"; + public static final String TAGS = "tags"; + public static final String GROUPS = "groups"; + public static final String PIPELINES = "pipelines"; + public static final String WFPROPERTIES = "workflow-properties"; + public static final String RUNSON = "runs-on"; + public static final String STOREDIN = "stored-in"; + public static final String FREQUENCY = "frequency"; + public static final String ATTRIBUTE_DB = "db"; + + private FalconBridge() { + } /** * Creates cluster entity @@ -92,75 +95,49 @@ public static Referenceable createClusterEntity(final org.apache.falcon.entity.v } if (StringUtils.isNotEmpty(cluster.getTags())) { - clusterRef.set(FalconBridge.TAGS, - EventUtil.convertKeyValueStringToMap(cluster.getTags())); + clusterRef.set(FalconBridge.TAGS, EventUtil.convertKeyValueStringToMap(cluster.getTags())); } return clusterRef; } - private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable) { - LOG.info("Creating feed dataset: {}", feed.getName()); - - Referenceable feedEntity = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); - feedEntity.set(AtlasClient.NAME, feed.getName()); - feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription()); - String feedQualifiedName = - getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(AtlasClient.NAME)); - feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); - feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString()); - feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable); - if (feed.getACL() != null) { - feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner()); - } - - if (StringUtils.isNotEmpty(feed.getTags())) { - feedEntity.set(FalconBridge.TAGS, - EventUtil.convertKeyValueStringToMap(feed.getTags())); - } - - if (feed.getGroups() != null) { - feedEntity.set(FalconBridge.GROUPS, feed.getGroups()); - } - - return feedEntity; - } - public static List createFeedCreationEntity(Feed feed, ConfigurationStore falconStore) throws FalconException, URISyntaxException { LOG.info("Creating feed : {}", feed.getName()); List entities = new ArrayList<>(); if (feed.getClusters() != null) { - List replicationInputs = new ArrayList<>(); + List replicationInputs = new ArrayList<>(); List replicationOutputs = new ArrayList<>(); for (org.apache.falcon.entity.v0.feed.Cluster feedCluster : feed.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, - feedCluster.getName()); + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, feedCluster.getName()); // set cluster Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); // input as hive_table or hdfs_path, output as falcon_feed dataset - List inputs = new ArrayList<>(); + List inputs = new ArrayList<>(); List inputReferenceables = getInputEntities(cluster, feed); + if (inputReferenceables != null) { entities.addAll(inputReferenceables); inputs.add(inputReferenceables.get(inputReferenceables.size() - 1)); } - List outputs = new ArrayList<>(); - Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable); + List outputs = new ArrayList<>(); + Referenceable feedEntity = createFeedEntity(feed, clusterReferenceable); + if (feedEntity != null) { entities.add(feedEntity); outputs.add(feedEntity); } if (!inputs.isEmpty() || !outputs.isEmpty()) { - Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName()); - String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName()); + Referenceable feedCreateEntity = new Referenceable(FalconDataTypes.FALCON_FEED_CREATION.getName()); + String feedQualifiedName = getFeedQualifiedName(feed.getName(), cluster.getName()); feedCreateEntity.set(AtlasClient.NAME, feed.getName()); feedCreateEntity.set(AtlasClient.DESCRIPTION, "Feed creation - " + feed.getName()); @@ -169,6 +146,7 @@ public static List createFeedCreationEntity(Feed feed, Configurat if (!inputs.isEmpty()) { feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs); } + if (!outputs.isEmpty()) { feedCreateEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs); } @@ -185,32 +163,29 @@ public static List createFeedCreationEntity(Feed feed, Configurat } if (!replicationInputs.isEmpty() && !replicationInputs.isEmpty()) { - Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes - .FALCON_FEED_REPLICATION.getName()); + Referenceable feedReplicationEntity = new Referenceable(FalconDataTypes.FALCON_FEED_REPLICATION.getName()); feedReplicationEntity.set(AtlasClient.NAME, feed.getName()); feedReplicationEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feed.getName()); - feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, replicationInputs); feedReplicationEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, replicationOutputs); + entities.add(feedReplicationEntity); } - } + return entities; } /** * Creates process entity - * + * * @param process process entity * @param falconStore config store * @return process instance reference - * * @throws FalconException if retrieving from the configuration store fail */ - public static List createProcessEntity(org.apache.falcon.entity.v0.process.Process process, - ConfigurationStore falconStore) throws FalconException { + public static List createProcessEntity(org.apache.falcon.entity.v0.process.Process process, ConfigurationStore falconStore) throws FalconException { LOG.info("Creating process Entity : {}", process.getName()); // The requirement is for each cluster, create a process entity with name @@ -218,44 +193,47 @@ public static List createProcessEntity(org.apache.falcon.entity.v List entities = new ArrayList<>(); if (process.getClusters() != null) { - for (Cluster processCluster : process.getClusters().getClusters()) { - org.apache.falcon.entity.v0.cluster.Cluster cluster = - falconStore.get(EntityType.CLUSTER, processCluster.getName()); - Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + org.apache.falcon.entity.v0.cluster.Cluster cluster = falconStore.get(EntityType.CLUSTER, processCluster.getName()); + Referenceable clusterReferenceable = getClusterEntityReference(cluster.getName(), cluster.getColo()); + entities.add(clusterReferenceable); List inputs = new ArrayList<>(); + if (process.getInputs() != null) { for (Input input : process.getInputs().getInputs()) { - Feed feed = falconStore.get(EntityType.FEED, input.getFeed()); + Feed feed = falconStore.get(EntityType.FEED, input.getFeed()); Referenceable inputReferenceable = getFeedDataSetReference(feed, clusterReferenceable); + entities.add(inputReferenceable); inputs.add(inputReferenceable); } } List outputs = new ArrayList<>(); + if (process.getOutputs() != null) { for (Output output : process.getOutputs().getOutputs()) { - Feed feed = falconStore.get(EntityType.FEED, output.getFeed()); + Feed feed = falconStore.get(EntityType.FEED, output.getFeed()); Referenceable outputReferenceable = getFeedDataSetReference(feed, clusterReferenceable); + entities.add(outputReferenceable); outputs.add(outputReferenceable); } } if (!inputs.isEmpty() || !outputs.isEmpty()) { - Referenceable processEntity = new Referenceable(FalconDataTypes.FALCON_PROCESS.getName()); + processEntity.set(AtlasClient.NAME, process.getName()); - processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - getProcessQualifiedName(process.getName(), cluster.getName())); + processEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getProcessQualifiedName(process.getName(), cluster.getName())); processEntity.set(FalconBridge.FREQUENCY, process.getFrequency().toString()); if (!inputs.isEmpty()) { processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_INPUTS, inputs); } + if (!outputs.isEmpty()) { processEntity.set(AtlasClient.PROCESS_ATTRIBUTE_OUTPUTS, outputs); } @@ -269,43 +247,89 @@ public static List createProcessEntity(org.apache.falcon.entity.v } if (StringUtils.isNotEmpty(process.getTags())) { - processEntity.set(FalconBridge.TAGS, - EventUtil.convertKeyValueStringToMap(process.getTags())); + processEntity.set(FalconBridge.TAGS, EventUtil.convertKeyValueStringToMap(process.getTags())); } if (process.getPipelines() != null) { processEntity.set(FalconBridge.PIPELINES, process.getPipelines()); } - processEntity.set(FalconBridge.WFPROPERTIES, - getProcessEntityWFProperties(process.getWorkflow(), - process.getName())); + processEntity.set(FalconBridge.WFPROPERTIES, getProcessEntityWFProperties(process.getWorkflow(), process.getName())); entities.add(processEntity); } - } } + return entities; } - private static List getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, - Feed feed) throws URISyntaxException { + public static String getFeedQualifiedName(final String feedName, final String clusterName) { + return String.format("%s@%s", feedName, clusterName); + } + + public static String getProcessQualifiedName(final String processName, final String clusterName) { + return String.format("%s@%s", processName, clusterName); + } + + public static String normalize(final String str) { + if (StringUtils.isBlank(str)) { + return null; + } + + return str.toLowerCase().trim(); + } + + private static Referenceable createFeedEntity(Feed feed, Referenceable clusterReferenceable) { + LOG.info("Creating feed dataset: {}", feed.getName()); + + Referenceable feedEntity = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); + + feedEntity.set(AtlasClient.NAME, feed.getName()); + feedEntity.set(AtlasClient.DESCRIPTION, feed.getDescription()); + + String feedQualifiedName = getFeedQualifiedName(feed.getName(), (String) clusterReferenceable.get(AtlasClient.NAME)); + + feedEntity.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, feedQualifiedName); + feedEntity.set(FalconBridge.FREQUENCY, feed.getFrequency().toString()); + feedEntity.set(FalconBridge.STOREDIN, clusterReferenceable); + + if (feed.getACL() != null) { + feedEntity.set(AtlasClient.OWNER, feed.getACL().getOwner()); + } + + if (StringUtils.isNotEmpty(feed.getTags())) { + feedEntity.set(FalconBridge.TAGS, EventUtil.convertKeyValueStringToMap(feed.getTags())); + } + + if (feed.getGroups() != null) { + feedEntity.set(FalconBridge.GROUPS, feed.getGroups()); + } + + return feedEntity; + } + + private static List getInputEntities(org.apache.falcon.entity.v0.cluster.Cluster cluster, Feed feed) throws URISyntaxException { org.apache.falcon.entity.v0.feed.Cluster feedCluster = FeedHelper.getCluster(feed, cluster.getName()); - if(feedCluster != null) { + if (feedCluster != null) { final CatalogTable table = getTable(feedCluster, feed); + if (table != null) { CatalogStorage storage = new CatalogStorage(cluster, table); - return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), - storage.getTable().toLowerCase()); + + return createHiveTableInstance(cluster.getName(), storage.getDatabase().toLowerCase(), storage.getTable().toLowerCase()); } else { List locations = FeedHelper.getLocations(feedCluster, feed); + if (CollectionUtils.isNotEmpty(locations)) { Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); + if (dataLocation != null) { final String pathUri = normalize(dataLocation.getPath()); + LOG.info("Registering DFS Path {} ", pathUri); + return fillHDFSDataSet(pathUri, cluster.getName()); } } @@ -326,91 +350,83 @@ private static CatalogTable getTable(org.apache.falcon.entity.v0.feed.Cluster cl private static List fillHDFSDataSet(final String pathUri, final String clusterName) { List entities = new ArrayList<>(); - Referenceable ref = new Referenceable(HiveMetaStoreBridge.HDFS_PATH); + Referenceable ref = new Referenceable(HiveMetaStoreBridge.HDFS_PATH); + ref.set("path", pathUri); + // Path path = new Path(pathUri); // ref.set("name", path.getName()); //TODO - Fix after ATLAS-542 to shorter Name Path path = new Path(pathUri); + ref.set(AtlasClient.NAME, Path.getPathWithoutSchemeAndAuthority(path).toString().toLowerCase()); ref.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, pathUri); ref.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); + entities.add(ref); + return entities; } private static Referenceable createHiveDatabaseInstance(String clusterName, String dbName) { Referenceable dbRef = new Referenceable(HiveDataTypes.HIVE_DB.getName()); + dbRef.set(AtlasConstants.CLUSTER_NAME_ATTRIBUTE, clusterName); dbRef.set(AtlasClient.NAME, dbName); - dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + dbRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getDBQualifiedName(clusterName, dbName)); + return dbRef; } - private static List createHiveTableInstance(String clusterName, String dbName, - String tableName) { + private static List createHiveTableInstance(String clusterName, String dbName, String tableName) { List entities = new ArrayList<>(); - Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); + Referenceable dbRef = createHiveDatabaseInstance(clusterName, dbName); + entities.add(dbRef); Referenceable tableRef = new Referenceable(HiveDataTypes.HIVE_TABLE.getName()); - tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, - HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + + tableRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); tableRef.set(AtlasClient.NAME, tableName.toLowerCase()); tableRef.set(ATTRIBUTE_DB, dbRef); + entities.add(tableRef); return entities; } - private static Referenceable getClusterEntityReference(final String clusterName, - final String colo) { + private static Referenceable getClusterEntityReference(final String clusterName, final String colo) { LOG.info("Getting reference for entity {}", clusterName); + Referenceable clusterRef = new Referenceable(FalconDataTypes.FALCON_CLUSTER.getName()); + clusterRef.set(AtlasClient.NAME, String.format("%s", clusterName)); clusterRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, clusterName); clusterRef.set(FalconBridge.COLO, colo); + return clusterRef; } - private static Referenceable getFeedDataSetReference(Feed feed, Referenceable clusterReference) { LOG.info("Getting reference for entity {}", feed.getName()); + Referenceable feedDatasetRef = new Referenceable(FalconDataTypes.FALCON_FEED.getName()); + feedDatasetRef.set(AtlasClient.NAME, feed.getName()); - feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getFeedQualifiedName(feed.getName(), - (String) clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME))); + feedDatasetRef.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, getFeedQualifiedName(feed.getName(), (String) clusterReference.get(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME))); feedDatasetRef.set(FalconBridge.STOREDIN, clusterReference); feedDatasetRef.set(FalconBridge.FREQUENCY, feed.getFrequency()); + return feedDatasetRef; } - private static Map getProcessEntityWFProperties(final Workflow workflow, - final String processName) { + private static Map getProcessEntityWFProperties(final Workflow workflow, final String processName) { Map wfProperties = new HashMap<>(); - wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), - ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); - wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), - workflow.getVersion()); - wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), - workflow.getEngine().value()); - return wfProperties; - } - - public static String getFeedQualifiedName(final String feedName, final String clusterName) { - return String.format("%s@%s", feedName, clusterName); - } - - public static String getProcessQualifiedName(final String processName, final String clusterName) { - return String.format("%s@%s", processName, clusterName); - } + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_NAME.getName(), ProcessHelper.getProcessWorkflowName(workflow.getName(), processName)); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_VERSION.getName(), workflow.getVersion()); + wfProperties.put(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE.getName(), workflow.getEngine().value()); - public static String normalize(final String str) { - if (StringUtils.isBlank(str)) { - return null; - } - return str.toLowerCase().trim(); + return wfProperties; } } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java index 51db894ab6..37dda6ed04 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/event/FalconEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -24,23 +24,14 @@ * Falcon event to interface with Atlas Service. */ public class FalconEvent { - protected String user; + protected String user; protected OPERATION operation; - protected Entity entity; + protected Entity entity; public FalconEvent(String doAsUser, OPERATION falconOperation, Entity entity) { - this.user = doAsUser; + this.user = doAsUser; this.operation = falconOperation; - this.entity = entity; - } - - public enum OPERATION { - ADD_CLUSTER, - UPDATE_CLUSTER, - ADD_FEED, - UPDATE_FEED, - ADD_PROCESS, - UPDATE_PROCESS, + this.entity = entity; } public String getUser() { @@ -54,4 +45,13 @@ public OPERATION getOperation() { public Entity getEntity() { return entity; } + + public enum OPERATION { + ADD_CLUSTER, + UPDATE_CLUSTER, + ADD_FEED, + UPDATE_FEED, + ADD_PROCESS, + UPDATE_PROCESS, + } } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java index b8a73cbe63..3a0f35d8ef 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/hook/FalconHook.java @@ -35,6 +35,7 @@ import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; + import static org.apache.atlas.repository.Constants.FALCON_SOURCE; /** @@ -43,31 +44,17 @@ public class FalconHook extends AtlasHook implements FalconEventPublisher { private static final Logger LOG = LoggerFactory.getLogger(FalconHook.class); - private static ConfigurationStore STORE; + private static ConfigurationStore store; @Override public String getMessageSource() { return FALCON_SOURCE; } - private enum Operation { - ADD, - UPDATE - } - - static { - try { - STORE = ConfigurationStore.get(); - } catch (Exception e) { - LOG.error("Caught exception initializing the falcon hook.", e); - } - - LOG.info("Created Atlas Hook for Falcon"); - } - @Override public void publish(final Data data) { final FalconEvent event = data.getEvent(); + try { fireAndForget(event); } catch (Throwable t) { @@ -77,17 +64,19 @@ public void publish(final Data data) { private void fireAndForget(FalconEvent event) throws FalconException, URISyntaxException { LOG.info("Entered Atlas hook for Falcon hook operation {}", event.getOperation()); + List messages = new ArrayList<>(); + Operation op = getOperation(event.getOperation()); + String user = getUser(event.getUser()); - Operation op = getOperation(event.getOperation()); - String user = getUser(event.getUser()); LOG.info("fireAndForget user:{}", user); - switch (op) { - case ADD: - messages.add(new EntityCreateRequest(user, createEntities(event, user))); - break; + switch (op) { + case ADD: + messages.add(new EntityCreateRequest(user, createEntities(event, user))); + break; } + notifyEntities(messages, null); } @@ -95,24 +84,23 @@ private List createEntities(FalconEvent event, String user) throw List entities = new ArrayList<>(); switch (event.getOperation()) { - case ADD_CLUSTER: - entities.add(FalconBridge - .createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity())); - break; - - case ADD_PROCESS: - entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), STORE)); - break; - - case ADD_FEED: - entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), STORE)); - break; - - case UPDATE_CLUSTER: - case UPDATE_FEED: - case UPDATE_PROCESS: - default: - LOG.info("Falcon operation {} is not valid or supported", event.getOperation()); + case ADD_CLUSTER: + entities.add(FalconBridge.createClusterEntity((org.apache.falcon.entity.v0.cluster.Cluster) event.getEntity())); + break; + + case ADD_PROCESS: + entities.addAll(FalconBridge.createProcessEntity((Process) event.getEntity(), store)); + break; + + case ADD_FEED: + entities.addAll(FalconBridge.createFeedCreationEntity((Feed) event.getEntity(), store)); + break; + + case UPDATE_CLUSTER: + case UPDATE_FEED: + case UPDATE_PROCESS: + default: + LOG.info("Falcon operation {} is not valid or supported", event.getOperation()); } return entities; @@ -120,19 +108,33 @@ private List createEntities(FalconEvent event, String user) throw private static Operation getOperation(final FalconEvent.OPERATION op) throws FalconException { switch (op) { - case ADD_CLUSTER: - case ADD_FEED: - case ADD_PROCESS: - return Operation.ADD; - - case UPDATE_CLUSTER: - case UPDATE_FEED: - case UPDATE_PROCESS: - return Operation.UPDATE; - - default: - throw new FalconException("Falcon operation " + op + " is not valid or supported"); + case ADD_CLUSTER: + case ADD_FEED: + case ADD_PROCESS: + return Operation.ADD; + + case UPDATE_CLUSTER: + case UPDATE_FEED: + case UPDATE_PROCESS: + return Operation.UPDATE; + + default: + throw new FalconException("Falcon operation " + op + " is not valid or supported"); } } -} + private enum Operation { + ADD, + UPDATE + } + + static { + try { + store = ConfigurationStore.get(); + } catch (Exception e) { + LOG.error("Caught exception initializing the falcon hook.", e); + } + + LOG.info("Created Atlas Hook for Falcon"); + } +} diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java index e36ff23aff..ca1032ddc3 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/model/FalconDataTypes.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -32,5 +32,4 @@ public enum FalconDataTypes { public String getName() { return name().toLowerCase(); } - } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java index a01ec14beb..a212443042 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/publisher/FalconEventPublisher.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,13 +18,14 @@ package org.apache.atlas.falcon.publisher; - import org.apache.atlas.falcon.event.FalconEvent; /** * Falcon publisher for Atlas */ public interface FalconEventPublisher { + void publish(Data data); + class Data { private FalconEvent event; @@ -36,6 +37,4 @@ public FalconEvent getEvent() { return event; } } - - void publish(final Data data); } diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java index 7482ba7b82..e3014a4084 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/service/AtlasService.java @@ -18,10 +18,10 @@ package org.apache.atlas.falcon.service; -import org.apache.atlas.falcon.Util.EventUtil; import org.apache.atlas.falcon.event.FalconEvent; import org.apache.atlas.falcon.hook.FalconHook; import org.apache.atlas.falcon.publisher.FalconEventPublisher; +import org.apache.atlas.falcon.util.EventUtil; import org.apache.falcon.FalconException; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; @@ -30,20 +30,19 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; - /** * Atlas service to publish Falcon events */ public class AtlasService implements FalconService, ConfigurationChangeListener { - private static final Logger LOG = LoggerFactory.getLogger(AtlasService.class); - private FalconEventPublisher publisher; /** * Constant for the service name. */ public static final String SERVICE_NAME = AtlasService.class.getSimpleName(); + private FalconEventPublisher publisher; + @Override public String getName() { return SERVICE_NAME; @@ -63,22 +62,22 @@ public void onAdd(Entity entity) throws FalconException { try { EntityType entityType = entity.getEntityType(); switch (entityType) { - case CLUSTER: - addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER); - break; + case CLUSTER: + addEntity(entity, FalconEvent.OPERATION.ADD_CLUSTER); + break; - case PROCESS: - addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS); - break; + case PROCESS: + addEntity(entity, FalconEvent.OPERATION.ADD_PROCESS); + break; - case FEED: - addEntity(entity, FalconEvent.OPERATION.ADD_FEED); - break; + case FEED: + addEntity(entity, FalconEvent.OPERATION.ADD_FEED); + break; - default: - LOG.debug("Entity type not processed {}", entityType); + default: + LOG.debug("Entity type not processed {}", entityType); } - } catch(Throwable t) { + } catch (Throwable t) { LOG.warn("Error handling entity {}", entity, t); } } @@ -91,26 +90,26 @@ public void onRemove(Entity entity) throws FalconException { public void onChange(Entity oldEntity, Entity newEntity) throws FalconException { /** * Skipping update for now - update uses full update currently and this might result in all attributes wiped for hive entities - EntityType entityType = newEntity.getEntityType(); - switch (entityType) { - case CLUSTER: - addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER); - break; - - case PROCESS: - addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); - break; - - case FEED: - FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ? - FalconEvent.OPERATION.UPDATE_REPLICATION_FEED : - FalconEvent.OPERATION.UPDATE_FEED; - addEntity(newEntity, operation); - break; - - default: - LOG.debug("Entity type not processed {}", entityType); - } + EntityType entityType = newEntity.getEntityType(); + switch (entityType) { + case CLUSTER: + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_CLUSTER); + break; + + case PROCESS: + addEntity(newEntity, FalconEvent.OPERATION.UPDATE_PROCESS); + break; + + case FEED: + FalconEvent.OPERATION operation = isReplicationFeed((Feed) newEntity) ? + FalconEvent.OPERATION.UPDATE_REPLICATION_FEED : + FalconEvent.OPERATION.UPDATE_FEED; + addEntity(newEntity, operation); + break; + + default: + LOG.debug("Entity type not processed {}", entityType); + } **/ } @@ -124,9 +123,9 @@ private void addEntity(Entity entity, FalconEvent.OPERATION operation) throws Fa LOG.info("Adding {} entity to Atlas: {}", entity.getEntityType().name(), entity.getName()); try { - FalconEvent event = - new FalconEvent(EventUtil.getUser(), operation, entity); - FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); + FalconEvent event = new FalconEvent(EventUtil.getUser(), operation, entity); + FalconEventPublisher.Data data = new FalconEventPublisher.Data(event); + publisher.publish(data); } catch (Exception ex) { throw new FalconException("Unable to publish data to publisher " + ex.getMessage(), ex); diff --git a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java similarity index 86% rename from addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java rename to addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java index ef5634009d..bcf838ca23 100644 --- a/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/Util/EventUtil.java +++ b/addons/falcon-bridge/src/main/java/org/apache/atlas/falcon/util/EventUtil.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.atlas.falcon.Util; +package org.apache.atlas.falcon.util; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; @@ -29,24 +29,24 @@ * Falcon event util */ public final class EventUtil { - private EventUtil() {} - public static Map convertKeyValueStringToMap(final String keyValueString) { if (StringUtils.isBlank(keyValueString)) { return null; } Map keyValueMap = new HashMap<>(); + String[] tags = keyValueString.split(","); - String[] tags = keyValueString.split(","); for (String tag : tags) { - int index = tag.indexOf("="); - String tagKey = tag.substring(0, index).trim(); + int index = tag.indexOf("="); + String tagKey = tag.substring(0, index).trim(); String tagValue = tag.substring(index + 1, tag.length()).trim(); + keyValueMap.put(tagKey, tagValue); } + return keyValueMap; } @@ -56,6 +56,7 @@ public static String getUser() throws FalconException { } catch (Exception ioe) { //Ignore is failed to get user, uses login user } + return null; } } diff --git a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java index e77f4c96d4..c96479dad5 100644 --- a/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java +++ b/addons/falcon-bridge/src/test/java/org/apache/atlas/falcon/hook/FalconHookIT.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -23,17 +23,17 @@ import org.apache.atlas.AtlasClientV2; import org.apache.atlas.falcon.bridge.FalconBridge; import org.apache.atlas.falcon.model.FalconDataTypes; +import org.apache.atlas.falcon.service.AtlasService; import org.apache.atlas.hive.bridge.HiveMetaStoreBridge; import org.apache.atlas.hive.model.HiveDataTypes; import org.apache.atlas.model.instance.AtlasEntity; import org.apache.atlas.model.instance.AtlasObjectId; import org.apache.atlas.type.AtlasTypeUtil; -import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.apache.atlas.utils.AuthenticationUtil; import org.apache.atlas.utils.ParamChecker; +import org.apache.atlas.v1.typesystem.types.utils.TypesUtil; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang.RandomStringUtils; -import org.apache.atlas.falcon.service.AtlasService; import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.FileSystemStorage; import org.apache.falcon.entity.store.ConfigurationStore; @@ -50,6 +50,7 @@ import org.testng.annotations.Test; import javax.xml.bind.JAXBException; + import java.util.Collections; import java.util.List; import java.util.Map; @@ -61,33 +62,145 @@ public class FalconHookIT { public static final Logger LOG = org.slf4j.LoggerFactory.getLogger(FalconHookIT.class); - public static final String CLUSTER_RESOURCE = "/cluster.xml"; - public static final String FEED_RESOURCE = "/feed.xml"; - public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml"; + public static final String CLUSTER_RESOURCE = "/cluster.xml"; + public static final String FEED_RESOURCE = "/feed.xml"; + public static final String FEED_HDFS_RESOURCE = "/feed-hdfs.xml"; public static final String FEED_REPLICATION_RESOURCE = "/feed-replication.xml"; - public static final String PROCESS_RESOURCE = "/process.xml"; - - private AtlasClientV2 atlasClient; + public static final String PROCESS_RESOURCE = "/process.xml"; private static final ConfigurationStore STORE = ConfigurationStore.get(); + private AtlasClientV2 atlasClient; + @BeforeClass public void setUp() throws Exception { Configuration atlasProperties = ApplicationProperties.get(); + if (!AuthenticationUtil.isKerberosAuthenticationEnabled()) { - atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[]{"admin", "admin"}); + atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT), new String[] {"admin", "admin"}); } else { atlasClient = new AtlasClientV2(atlasProperties.getStringArray(HiveMetaStoreBridge.ATLAS_ENDPOINT)); } AtlasService service = new AtlasService(); + service.init(); STORE.registerListener(service); CurrentUser.authenticate(System.getProperty("user.name")); } + @Test + public void testCreateProcess() throws Exception { + Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, cluster); + + assertClusterIsRegistered(cluster); + + Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null); + String infeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName()))).getGuid(); + Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); + String outFeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); + Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); + + process.getClusters().getClusters().get(0).setName(cluster.getName()); + process.getInputs().getInputs().get(0).setFeed(infeed.getName()); + process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); + + STORE.publish(EntityType.PROCESS, process); + + String pid = assertProcessIsRegistered(process, cluster.getName()); + AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); + + assertNotNull(processEntity); + assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); + assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("inputs")).get(0)), infeedId); + assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("outputs")).get(0)), outFeedId); + } + + @Test + public void testReplicationFeed() throws Exception { + Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, srcCluster); + + assertClusterIsRegistered(srcCluster); + + Cluster targetCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, targetCluster); + + assertClusterIsRegistered(targetCluster); + + Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName()); + String inId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName()))).getGuid(); + String outId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName()))).getGuid(); + String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName()); + AtlasEntity process = atlasClient.getEntityByGuid(processId).getEntity(); + + assertEquals(getGuidFromObjectId(((List) process.getAttribute("inputs")).get(0)), inId); + assertEquals(getGuidFromObjectId(((List) process.getAttribute("outputs")).get(0)), outId); + } + + @Test + public void testCreateProcessWithHDFSFeed() throws Exception { + Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); + + STORE.publish(EntityType.CLUSTER, cluster); + + TypesUtil.Pair result = getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName()); + Feed infeed = result.right; + String infeedId = result.left; + + Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); + String outfeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); + Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); + + process.getClusters().getClusters().get(0).setName(cluster.getName()); + process.getInputs().getInputs().get(0).setFeed(infeed.getName()); + process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); + + STORE.publish(EntityType.PROCESS, process); + + String pid = assertProcessIsRegistered(process, cluster.getName()); + AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); + + assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); + assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName())); + assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("inputs")).get(0)), infeedId); + assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("outputs")).get(0)), outfeedId); + } + + /** + * Wait for a condition, expressed via a {@link Predicate} to become true. + * + * @param timeout maximum time in milliseconds to wait for the predicate to become true. + * @param predicate predicate waiting on. + */ + protected void waitFor(int timeout, Predicate predicate) throws Exception { + ParamChecker.notNull(predicate, "predicate"); + + long mustEnd = System.currentTimeMillis() + timeout; + + while (true) { + try { + predicate.evaluate(); + return; + } catch (Error | Exception e) { + if (System.currentTimeMillis() >= mustEnd) { + fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e); + } + + LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e); + + Thread.sleep(400); + } + } + } + private T loadEntity(EntityType type, String resource, String name) throws JAXBException { Entity entity = (Entity) type.getUnmarshaller().unmarshal(this.getClass().getResourceAsStream(resource)); + switch (entity.getEntityType()) { case CLUSTER: ((Cluster) entity).setName(name); @@ -101,7 +214,8 @@ private T loadEntity(EntityType type, String resource, String ((Process) entity).setName(name); break; } - return (T)entity; + + return (T) entity; } private String random() { @@ -112,67 +226,39 @@ private String getTableUri(String dbName, String tableName) { return String.format("catalog:%s:%s#ds=${YEAR}-${MONTH}-${DAY}-${HOUR}", dbName, tableName); } - @Test - public void testCreateProcess() throws Exception { - Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, cluster); - assertClusterIsRegistered(cluster); - - Feed infeed = getTableFeed(FEED_RESOURCE, cluster.getName(), null); - String infeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(infeed.getName(), cluster.getName()))).getGuid(); - - Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); - String outFeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); - - Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); - process.getClusters().getClusters().get(0).setName(cluster.getName()); - process.getInputs().getInputs().get(0).setFeed(infeed.getName()); - process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); - STORE.publish(EntityType.PROCESS, process); - - String pid = assertProcessIsRegistered(process, cluster.getName()); - AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); - assertNotNull(processEntity); - assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); - assertEquals(getGuidFromObjectId(((List)processEntity.getAttribute("inputs")).get(0)), infeedId); - assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("outputs")).get(0)), outFeedId); - } - private String assertProcessIsRegistered(Process process, String clusterName) throws Exception { - return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getProcessQualifiedName(process.getName(), clusterName)); + return assertEntityIsRegistered(FalconDataTypes.FALCON_PROCESS.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getProcessQualifiedName(process.getName(), clusterName)); } private String assertClusterIsRegistered(Cluster cluster) throws Exception { - return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName()); + return assertEntityIsRegistered(FalconDataTypes.FALCON_CLUSTER.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, cluster.getName()); } private TypesUtil.Pair getHDFSFeed(String feedResource, String clusterName) throws Exception { - Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); + Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0); + feedCluster.setName(clusterName); STORE.publish(EntityType.FEED, feed); + String feedId = assertFeedIsRegistered(feed, clusterName); + assertFeedAttributes(feedId); - String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); + String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); AtlasEntity processEntity = atlasClient.getEntityByGuid(processId).getEntity(); + assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("outputs")).get(0)), feedId); - String inputId = getGuidFromObjectId(((List) processEntity.getAttribute("inputs")).get(0)); + String inputId = getGuidFromObjectId(((List) processEntity.getAttribute("inputs")).get(0)); AtlasEntity pathEntity = atlasClient.getEntityByGuid(inputId).getEntity(); + assertEquals(pathEntity.getTypeName(), HiveMetaStoreBridge.HDFS_PATH); - List locations = FeedHelper.getLocations(feedCluster, feed); - Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); - assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), - FalconBridge.normalize(dataLocation.getPath())); + List locations = FeedHelper.getLocations(feedCluster, feed); + Location dataLocation = FileSystemStorage.getLocation(locations, LocationType.DATA); + + assertEquals(pathEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), FalconBridge.normalize(dataLocation.getPath())); return TypesUtil.Pair.of(feedId, feed); } @@ -182,114 +268,67 @@ private Feed getTableFeed(String feedResource, String clusterName) throws Except } private Feed getTableFeed(String feedResource, String clusterName, String secondClusterName) throws Exception { - Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); + Feed feed = loadEntity(EntityType.FEED, feedResource, "feed" + random()); org.apache.falcon.entity.v0.feed.Cluster feedCluster = feed.getClusters().getClusters().get(0); + feedCluster.setName(clusterName); - String dbName = "db" + random(); + + String dbName = "db" + random(); String tableName = "table" + random(); + feedCluster.getTable().setUri(getTableUri(dbName, tableName)); - String dbName2 = "db" + random(); + String dbName2 = "db" + random(); String tableName2 = "table" + random(); if (secondClusterName != null) { org.apache.falcon.entity.v0.feed.Cluster feedCluster2 = feed.getClusters().getClusters().get(1); + feedCluster2.setName(secondClusterName); feedCluster2.getTable().setUri(getTableUri(dbName2, tableName2)); } STORE.publish(EntityType.FEED, feed); + String feedId = assertFeedIsRegistered(feed, clusterName); + assertFeedAttributes(feedId); verifyFeedLineage(feed.getName(), clusterName, feedId, dbName, tableName); if (secondClusterName != null) { String feedId2 = assertFeedIsRegistered(feed, secondClusterName); + assertFeedAttributes(feedId2); verifyFeedLineage(feed.getName(), secondClusterName, feedId2, dbName2, tableName2); } + return feed; } private void assertFeedAttributes(String feedId) throws Exception { AtlasEntity feedEntity = atlasClient.getEntityByGuid(feedId).getEntity(); + assertEquals(feedEntity.getAttribute(AtlasClient.OWNER), "testuser"); assertEquals(feedEntity.getAttribute(FalconBridge.FREQUENCY), "hours(1)"); assertEquals(feedEntity.getAttribute(AtlasClient.DESCRIPTION), "test input"); } - private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName) - throws Exception{ + private void verifyFeedLineage(String feedName, String clusterName, String feedId, String dbName, String tableName) throws Exception { //verify that lineage from hive table to falcon feed is created - String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feedName, clusterName)); + String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_CREATION.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feedName, clusterName)); AtlasEntity processEntity = atlasClient.getEntityByGuid(processId).getEntity(); + assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("outputs")).get(0)), feedId); - String inputId = getGuidFromObjectId(((List) processEntity.getAttribute("inputs")).get(0)); + String inputId = getGuidFromObjectId(((List) processEntity.getAttribute("inputs")).get(0)); AtlasEntity tableEntity = atlasClient.getEntityByGuid(inputId).getEntity(); - assertEquals(tableEntity.getTypeName(), HiveDataTypes.HIVE_TABLE.getName()); - assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), - HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); + assertEquals(tableEntity.getTypeName(), HiveDataTypes.HIVE_TABLE.getName()); + assertEquals(tableEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), HiveMetaStoreBridge.getTableQualifiedName(clusterName, dbName, tableName)); } private String assertFeedIsRegistered(Feed feed, String clusterName) throws Exception { - return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); - } - - @Test - public void testReplicationFeed() throws Exception { - Cluster srcCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, srcCluster); - assertClusterIsRegistered(srcCluster); - - Cluster targetCluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, targetCluster); - assertClusterIsRegistered(targetCluster); - - Feed feed = getTableFeed(FEED_REPLICATION_RESOURCE, srcCluster.getName(), targetCluster.getName()); - String inId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), srcCluster.getName()))).getGuid(); - String outId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(feed.getName(), targetCluster.getName()))).getGuid(); - - - String processId = assertEntityIsRegistered(FalconDataTypes.FALCON_FEED_REPLICATION.getName(), - AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, feed.getName()); - AtlasEntity process = atlasClient.getEntityByGuid(processId).getEntity(); - assertEquals(getGuidFromObjectId(((List) process.getAttribute("inputs")).get(0)), inId); - assertEquals(getGuidFromObjectId(((List) process.getAttribute("outputs")).get(0)), outId); - } - - @Test - public void testCreateProcessWithHDFSFeed() throws Exception { - Cluster cluster = loadEntity(EntityType.CLUSTER, CLUSTER_RESOURCE, "cluster" + random()); - STORE.publish(EntityType.CLUSTER, cluster); - - TypesUtil.Pair result = getHDFSFeed(FEED_HDFS_RESOURCE, cluster.getName()); - Feed infeed = result.right; - String infeedId = result.left; - - Feed outfeed = getTableFeed(FEED_RESOURCE, cluster.getName()); - String outfeedId = atlasClient.getEntityHeaderByAttribute(FalconDataTypes.FALCON_FEED.getName(), Collections.singletonMap(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, - FalconBridge.getFeedQualifiedName(outfeed.getName(), cluster.getName()))).getGuid(); - - Process process = loadEntity(EntityType.PROCESS, PROCESS_RESOURCE, "process" + random()); - process.getClusters().getClusters().get(0).setName(cluster.getName()); - process.getInputs().getInputs().get(0).setFeed(infeed.getName()); - process.getOutputs().getOutputs().get(0).setFeed(outfeed.getName()); - STORE.publish(EntityType.PROCESS, process); - - String pid = assertProcessIsRegistered(process, cluster.getName()); - AtlasEntity processEntity = atlasClient.getEntityByGuid(pid).getEntity(); - assertEquals(processEntity.getAttribute(AtlasClient.NAME), process.getName()); - assertEquals(processEntity.getAttribute(AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME), - FalconBridge.getProcessQualifiedName(process.getName(), cluster.getName())); - assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("inputs")).get(0)), infeedId); - assertEquals(getGuidFromObjectId(((List) processEntity.getAttribute("outputs")).get(0)), outfeedId); + return assertEntityIsRegistered(FalconDataTypes.FALCON_FEED.getName(), AtlasTypeUtil.ATTRIBUTE_QUALIFIED_NAME, FalconBridge.getFeedQualifiedName(feed.getName(), clusterName)); } private String assertEntityIsRegistered(final String typeName, final String property, final String value) throws Exception { @@ -297,10 +336,12 @@ private String assertEntityIsRegistered(final String typeName, final String prop @Override public void evaluate() throws Exception { AtlasEntity.AtlasEntityWithExtInfo entity = atlasClient.getEntityByAttribute(typeName, Collections.singletonMap(property, value)); + assertNotNull(entity); assertNotNull(entity.getEntity()); } }); + return atlasClient.getEntityHeaderByAttribute(typeName, Collections.singletonMap(property, value)).getGuid(); } @@ -323,28 +364,4 @@ public interface Predicate { */ void evaluate() throws Exception; } - - /** - * Wait for a condition, expressed via a {@link Predicate} to become true. - * - * @param timeout maximum time in milliseconds to wait for the predicate to become true. - * @param predicate predicate waiting on. - */ - protected void waitFor(int timeout, Predicate predicate) throws Exception { - ParamChecker.notNull(predicate, "predicate"); - long mustEnd = System.currentTimeMillis() + timeout; - - while (true) { - try { - predicate.evaluate(); - return; - } catch(Error | Exception e) { - if (System.currentTimeMillis() >= mustEnd) { - fail("Assertions failed. Failing after waiting for timeout " + timeout + " msecs", e); - } - LOG.debug("Waiting up to {} msec as assertion failed", mustEnd - System.currentTimeMillis(), e); - Thread.sleep(400); - } - } - } } diff --git a/addons/falcon-bridge/src/test/resources/atlas-application.properties b/addons/falcon-bridge/src/test/resources/atlas-application.properties index 0ce0f46c9b..94a75aab4a 100644 --- a/addons/falcon-bridge/src/test/resources/atlas-application.properties +++ b/addons/falcon-bridge/src/test/resources/atlas-application.properties @@ -15,54 +15,39 @@ # See the License for the specific language governing permissions and # limitations under the License. # - ######### Atlas Server Configs ######### atlas.rest.address=http://localhost:31000 - ######### Graph Database Configs ######### - - # Graph database implementation. Value inserted by maven. atlas.graphdb.backend=org.apache.atlas.repository.graphdb.janus.AtlasJanusGraphDatabase atlas.graph.index.search.solr.wait-searcher=true - # Graph Storage atlas.graph.storage.backend=berkeleyje - # Entity repository implementation atlas.EntityAuditRepository.impl=org.apache.atlas.repository.audit.InMemoryEntityAuditRepository - # Graph Search Index Backend atlas.graph.index.search.backend=solr - #Berkeley storage directory atlas.graph.storage.directory=${sys:atlas.data}/berkley atlas.graph.storage.transactions=true - #hbase #For standalone mode , specify localhost #for distributed mode, specify zookeeper quorum here - atlas.graph.storage.hostname=${graph.storage.hostname} atlas.graph.storage.hbase.regions-per-server=1 atlas.graph.storage.lock.wait-time=10000 - #ElasticSearch atlas.graph.index.search.directory=${sys:atlas.data}/es atlas.graph.index.search.elasticsearch.client-only=false atlas.graph.index.search.elasticsearch.local-mode=true atlas.graph.index.search.elasticsearch.create.sleep=2000 - # Solr cloud mode properties atlas.graph.index.search.solr.mode=cloud atlas.graph.index.search.solr.zookeeper-url=${solr.zk.address} atlas.graph.index.search.solr.embedded=true atlas.graph.index.search.max-result-set-size=150 - - ######### Notification Configs ######### atlas.notification.embedded=true - atlas.kafka.zookeeper.connect=localhost:19026 atlas.kafka.bootstrap.servers=localhost:19027 atlas.kafka.data=${sys:atlas.data}/kafka @@ -73,52 +58,38 @@ atlas.kafka.auto.commit.interval.ms=100 atlas.kafka.hook.group.id=atlas atlas.kafka.entities.group.id=atlas_entities #atlas.kafka.auto.commit.enable=false - atlas.kafka.enable.auto.commit=false atlas.kafka.auto.offset.reset=earliest atlas.kafka.session.timeout.ms=30000 atlas.kafka.offsets.topic.replication.factor=1 - - - ######### Entity Audit Configs ######### atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS atlas.audit.zookeeper.session.timeout.ms=1000 atlas.audit.hbase.zookeeper.quorum=localhost atlas.audit.hbase.zookeeper.property.clientPort=19026 - ######### Security Properties ######### - # SSL config atlas.enableTLS=false atlas.server.https.port=31443 - ######### Security Properties ######### - hbase.security.authentication=simple - atlas.hook.falcon.synchronous=true - ######### JAAS Configuration ######## - -atlas.jaas.KafkaClient.loginModuleName = com.sun.security.auth.module.Krb5LoginModule -atlas.jaas.KafkaClient.loginModuleControlFlag = required -atlas.jaas.KafkaClient.option.useKeyTab = true -atlas.jaas.KafkaClient.option.storeKey = true -atlas.jaas.KafkaClient.option.serviceName = kafka -atlas.jaas.KafkaClient.option.keyTab = /etc/security/keytabs/atlas.service.keytab -atlas.jaas.KafkaClient.option.principal = atlas/_HOST@EXAMPLE.COM - +atlas.jaas.KafkaClient.loginModuleName=com.sun.security.auth.module.Krb5LoginModule +atlas.jaas.KafkaClient.loginModuleControlFlag=required +atlas.jaas.KafkaClient.option.useKeyTab=true +atlas.jaas.KafkaClient.option.storeKey=true +atlas.jaas.KafkaClient.option.serviceName=kafka +atlas.jaas.KafkaClient.option.keyTab=/etc/security/keytabs/atlas.service.keytab +atlas.jaas.KafkaClient.option.principal=atlas/_HOST@EXAMPLE.COM ######### High Availability Configuration ######## atlas.server.ha.enabled=false #atlas.server.ids=id1 #atlas.server.address.id1=localhost:21000 - ######### Atlas Authorization ######### atlas.authorizer.impl=none # atlas.authorizer.impl=simple # atlas.authorizer.simple.authz.policy.file=atlas-simple-authz-policy.json - ######### Atlas Authentication ######### atlas.authentication.method.file=true atlas.authentication.method.ldap.type=none diff --git a/addons/falcon-bridge/src/test/resources/atlas-logback.xml b/addons/falcon-bridge/src/test/resources/atlas-logback.xml index 78fd420dc8..991cb621de 100755 --- a/addons/falcon-bridge/src/test/resources/atlas-logback.xml +++ b/addons/falcon-bridge/src/test/resources/atlas-logback.xml @@ -18,115 +18,114 @@ --> - - - - %date [%thread] %level{5} [%file:%line] %msg%n - - - INFO - - - - - ${atlas.log.dir}/${atlas.log.file} - true - - %date [%thread] %level{5} [%file:%line] %msg%n - - - ${atlas.log.dir}/${atlas.log.file}-%d - 20 - true - - - - - ${atlas.log.dir}/audit.log - true - - %date [%thread] %level{5} [%file:%line] %msg%n - - - ${atlas.log.dir}/audit-%d.log - 20 - false - - - - - ${atlas.log.dir}/metrics.log - true - - %date [%thread] %level{5} [%file:%line] %msg%n - - - ${atlas.log.dir}/metrics-%d.log - 20 - false - - - - - ${atlas.log.dir}/failed.log - true - - %date [%thread] %level{5} [%file:%line] %msg%n - - - ${atlas.log.dir}/failed-%d.log - 20 - false - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + %date [%thread] %level{5} [%file:%line] %msg%n + + + INFO + + + + + ${atlas.log.dir}/${atlas.log.file} + true + + %date [%thread] %level{5} [%file:%line] %msg%n + + + ${atlas.log.dir}/${atlas.log.file}-%d + 20 + true + + + + + ${atlas.log.dir}/audit.log + true + + %date [%thread] %level{5} [%file:%line] %msg%n + + + ${atlas.log.dir}/audit-%d.log + 20 + false + + + + + ${atlas.log.dir}/metrics.log + true + + %date [%thread] %level{5} [%file:%line] %msg%n + + + ${atlas.log.dir}/metrics-%d.log + 20 + false + + + + + ${atlas.log.dir}/failed.log + true + + %date [%thread] %level{5} [%file:%line] %msg%n + + + ${atlas.log.dir}/failed-%d.log + 20 + false + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/addons/falcon-bridge/src/test/resources/cluster.xml b/addons/falcon-bridge/src/test/resources/cluster.xml index b183847db3..b7bbda74c1 100644 --- a/addons/falcon-bridge/src/test/resources/cluster.xml +++ b/addons/falcon-bridge/src/test/resources/cluster.xml @@ -20,7 +20,7 @@ Primary cluster configuration for demo vm --> + xmlns="uri:falcon:cluster:0.1"> diff --git a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml index 435db07451..582998a3c0 100644 --- a/addons/falcon-bridge/src/test/resources/feed-hdfs.xml +++ b/addons/falcon-bridge/src/test/resources/feed-hdfs.xml @@ -21,19 +21,19 @@ hours(1) UTC - + - - + + - + - - + + diff --git a/addons/falcon-bridge/src/test/resources/feed-replication.xml b/addons/falcon-bridge/src/test/resources/feed-replication.xml index dcd427b180..42c4f863f8 100644 --- a/addons/falcon-bridge/src/test/resources/feed-replication.xml +++ b/addons/falcon-bridge/src/test/resources/feed-replication.xml @@ -21,23 +21,23 @@ hours(1) UTC - + - - + + - - + +
- - + + diff --git a/addons/falcon-bridge/src/test/resources/feed.xml b/addons/falcon-bridge/src/test/resources/feed.xml index 473c745ce8..f58316b77e 100644 --- a/addons/falcon-bridge/src/test/resources/feed.xml +++ b/addons/falcon-bridge/src/test/resources/feed.xml @@ -21,18 +21,18 @@ hours(1) UTC - + - - + +
- - + + diff --git a/addons/falcon-bridge/src/test/resources/process.xml b/addons/falcon-bridge/src/test/resources/process.xml index b94d0a8470..62e7542f18 100644 --- a/addons/falcon-bridge/src/test/resources/process.xml +++ b/addons/falcon-bridge/src/test/resources/process.xml @@ -22,7 +22,7 @@ - + @@ -32,22 +32,22 @@ UTC - + - + - + - + - + - + diff --git a/addons/falcon-bridge/src/test/resources/startup.properties b/addons/falcon-bridge/src/test/resources/startup.properties index 9623470396..661a2be89e 100644 --- a/addons/falcon-bridge/src/test/resources/startup.properties +++ b/addons/falcon-bridge/src/test/resources/startup.properties @@ -15,7 +15,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - *.domain=debug *.config.store.persist=false *.config.store.uri=target/config_store \ No newline at end of file