CypherPatternMatching.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.cypher;
import com.google.common.collect.Sets;
import org.apache.flink.api.java.DataSet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.PostProcessor;
import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
import org.gradoop.flink.model.impl.operators.matching.common.statistics.GraphStatistics;
import org.gradoop.flink.model.impl.operators.matching.single.PatternMatching;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.debug.PrintEmbedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.functions.ElementsFromEmbedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.add.AddEmbeddingsElements;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.project.ProjectEmbeddingsElements;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.planner.greedy.GreedyPlanner;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.queryplan.QueryPlan;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
import java.util.Set;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.intersection;
import static org.gradoop.flink.model.impl.operators.matching.common.debug.Printer.log;
/**
* Implementation of a query engine based on the Cypher graph query language.
*
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The graph type.
* @param <GC> The graph collection type.
*/
public class CypherPatternMatching<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> extends PatternMatching<G, V, E, LG, GC> {
/**
* Logger
*/
private static final Logger LOG = LogManager.getLogger(CypherPatternMatching.class);
/**
* Construction pattern for result transformation.
*/
private final String constructionPattern;
/**
* Morphism strategy for vertex mappings
*/
private final MatchStrategy vertexStrategy;
/**
* Morphism strategy for edge mappings
*/
private final MatchStrategy edgeStrategy;
/**
* Statistics about the data graph
*/
private final GraphStatistics graphStatistics;
/**
* Instantiates a new operator.
*
* @param query Cypher query string
* @param attachData true, if original data shall be attached to the result
* @param vertexStrategy morphism strategy for vertex mappings
* @param edgeStrategy morphism strategy for edge mappings
* @param graphStatistics statistics about the data graph
*/
public CypherPatternMatching(String query, boolean attachData, MatchStrategy vertexStrategy,
MatchStrategy edgeStrategy, GraphStatistics graphStatistics) {
this(query, null, attachData, vertexStrategy, edgeStrategy, graphStatistics);
}
/**
* Instantiates a new operator.
*
* @param query Cypher query string
* @param constructionPattern Construction pattern
* @param attachData true, if original data shall be attached to the result
* @param vertexStrategy morphism strategy for vertex mappings
* @param edgeStrategy morphism strategy for edge mappings
* @param graphStatistics statistics about the data graph
*/
public CypherPatternMatching(String query, String constructionPattern, boolean attachData,
MatchStrategy vertexStrategy, MatchStrategy edgeStrategy, GraphStatistics graphStatistics) {
super(query, attachData, LOG);
this.constructionPattern = constructionPattern;
this.vertexStrategy = vertexStrategy;
this.edgeStrategy = edgeStrategy;
this.graphStatistics = graphStatistics;
}
@Override
protected GC executeForVertex(LG graph) {
return executeForPattern(graph);
}
@Override
protected GC executeForPattern(LG graph) {
// Query planning
QueryHandler queryHandler = getQueryHandler();
QueryPlan plan =
new GreedyPlanner<>(graph, queryHandler, graphStatistics, vertexStrategy, edgeStrategy).plan()
.getQueryPlan();
// Query execution
DataSet<Embedding> embeddings = plan.execute();
EmbeddingMetaData embeddingMetaData = plan.getRoot().getEmbeddingMetaData();
embeddings =
log(embeddings, new PrintEmbedding(embeddingMetaData), getVertexMapping(), getEdgeMapping());
// Pattern construction (if necessary)
DataSet<Element> finalElements = this.constructionPattern != null ?
constructFinalElements(graph, embeddings, embeddingMetaData) :
embeddings.flatMap(
new ElementsFromEmbedding<>(
graph.getFactory().getGraphHeadFactory(),
graph.getFactory().getVertexFactory(),
graph.getFactory().getEdgeFactory(),
embeddingMetaData,
queryHandler.getSourceTargetVariables()));
// Post processing
return
doAttachData() ? PostProcessor.extractGraphCollectionWithData(finalElements, graph, true) :
PostProcessor.extractGraphCollection(finalElements, graph.getCollectionFactory(), true);
}
/**
* Method to construct final embedded elements
*
* @param graph Used logical graph
* @param embeddings embeddings
* @param embeddingMetaData Meta information
* @return New set of EmbeddingElements
*/
private DataSet<Element> constructFinalElements(LG graph, DataSet<Embedding> embeddings,
EmbeddingMetaData embeddingMetaData) {
QueryHandler constructionPatternHandler = new QueryHandler(this.constructionPattern);
constructionPatternHandler.updateGeneratedVariableNames(n -> "_" + n);
Set<String> queryVars = Sets.newHashSet(embeddingMetaData.getVariables());
Set<String> constructionVars = constructionPatternHandler.getAllVariables();
Set<String> existingVars = intersection(queryVars, constructionVars).immutableCopy();
Set<String> newVars = difference(constructionVars, queryVars).immutableCopy();
EmbeddingMetaData newMetaData = computeNewMetaData(
embeddingMetaData, constructionPatternHandler, existingVars, newVars);
// project existing embedding elements to new embeddings
ProjectEmbeddingsElements projectedEmbeddings =
new ProjectEmbeddingsElements(embeddings, existingVars, embeddingMetaData, newMetaData);
// add new embedding elements
AddEmbeddingsElements addEmbeddingsElements =
new AddEmbeddingsElements(projectedEmbeddings.evaluate(), newVars.size());
return addEmbeddingsElements.evaluate().flatMap(
new ElementsFromEmbedding<>(
graph.getFactory().getGraphHeadFactory(),
graph.getFactory().getVertexFactory(),
graph.getFactory().getEdgeFactory(),
newMetaData,
constructionPatternHandler.getSourceTargetVariables(),
constructionPatternHandler.getLabelsForVariables(newVars)));
}
/**
* Compute new meta information
*
* @param metaData old meta information
* @param returnPatternHandler pattern handler
* @param existingVariables old variables
* @param newVariables new variables
* @return new EmbeddingMetaData
*/
private EmbeddingMetaData computeNewMetaData(EmbeddingMetaData metaData,
QueryHandler returnPatternHandler, Set<String> existingVariables, Set<String> newVariables) {
// update meta data
EmbeddingMetaData newMetaData = new EmbeddingMetaData();
// case 1: Filter existing embeddings based on return pattern
for (String var : existingVariables) {
newMetaData.setEntryColumn(var, metaData.getEntryType(var), newMetaData.getEntryCount());
}
// case 2: Add new vertices and edges
for (String var : newVariables) {
EmbeddingMetaData.EntryType type = returnPatternHandler.isEdge(var) ?
EmbeddingMetaData.EntryType.EDGE :
EmbeddingMetaData.EntryType.VERTEX;
newMetaData.setEntryColumn(var, type, newMetaData.getEntryCount());
}
return newMetaData;
}
}