CypherTemporalPatternMatching.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.matching.single.cypher;
import com.google.common.collect.Sets;
import org.apache.flink.api.java.DataSet;
import org.apache.log4j.Logger;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.PostProcessor;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.debug.PrintEmbedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.add.AddEmbeddingsElements;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.project.ProjectEmbeddingsElements;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.planning.queryplan.QueryPlan;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
import org.gradoop.temporal.model.impl.TemporalGraph;
import org.gradoop.temporal.model.impl.TemporalGraphCollection;
import org.gradoop.temporal.model.impl.operators.matching.common.query.TemporalQueryHandler;
import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.CNFPostProcessing;
import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.exceptions.QueryContradictoryException;
import org.gradoop.temporal.model.impl.operators.matching.common.statistics.TemporalGraphStatistics;
import org.gradoop.temporal.model.impl.operators.matching.single.TemporalPatternMatching;
import org.gradoop.temporal.model.impl.operators.matching.single.cypher.functions.ElementsFromEmbeddingTPGM;
import org.gradoop.temporal.model.impl.operators.matching.single.cypher.planning.planner.greedy.GreedyPlanner;
import org.gradoop.temporal.model.impl.pojo.TemporalEdge;
import org.gradoop.temporal.model.impl.pojo.TemporalGraphHead;
import org.gradoop.temporal.model.impl.pojo.TemporalVertex;
import java.util.ArrayList;
import java.util.Objects;
import java.util.Set;
import static com.google.common.collect.Sets.difference;
import static com.google.common.collect.Sets.intersection;
import static org.gradoop.flink.model.impl.operators.matching.common.debug.Printer.log;
/**
* Implementation of a query engine based on the Cypher graph query language.
*/
public class CypherTemporalPatternMatching
extends TemporalPatternMatching<TemporalGraphHead, TemporalGraph, TemporalGraphCollection> {
/**
* Logger
*/
private static final Logger LOG = Logger.getLogger(CypherTemporalPatternMatching.class);
/**
* Construction pattern for result transformation.
*/
private final String constructionPattern;
/**
* Morphism strategy for vertex mappings
*/
private final MatchStrategy vertexStrategy;
/**
* Morphism strategy for edge mappings
*/
private final MatchStrategy edgeStrategy;
/**
* Statistics about the data graph
*/
private final TemporalGraphStatistics graphStatistics;
/**
* Instantiates a new operator.
*
* @param query Cypher query string
* @param attachData true, if original data shall be attached to the result
* @param vertexStrategy morphism strategy for vertex mappings
* @param edgeStrategy morphism strategy for edge mappings
* @param graphStatistics statistics about the data graph
* @param postprocessor postprocessing pipeline for the query CNF
*/
public CypherTemporalPatternMatching(String query, boolean attachData,
MatchStrategy vertexStrategy, MatchStrategy edgeStrategy,
TemporalGraphStatistics graphStatistics,
CNFPostProcessing postprocessor) {
this(query, null, attachData, vertexStrategy, edgeStrategy, graphStatistics, postprocessor);
}
/**
* Instantiates a new operator.
*
* @param query Cypher query string
* @param constructionPattern Construction pattern
* @param attachData true, if original data shall be attached to the result
* @param vertexStrategy morphism strategy for vertex mappings
* @param edgeStrategy morphism strategy for edge mappings
* @param graphStatistics statistics about the data graph
* @param postprocessor postprocessing pipeline for the query CNF
*/
public CypherTemporalPatternMatching(String query, String constructionPattern, boolean attachData,
MatchStrategy vertexStrategy, MatchStrategy edgeStrategy,
TemporalGraphStatistics graphStatistics,
CNFPostProcessing postprocessor) {
super(query, attachData, postprocessor, LOG);
this.constructionPattern = constructionPattern;
this.vertexStrategy = vertexStrategy;
this.edgeStrategy = edgeStrategy;
this.graphStatistics = graphStatistics;
}
@Override
protected TemporalGraphCollection executeForVertex(TemporalGraph graph) {
return executeForPattern(graph);
}
@Override
protected TemporalGraphCollection executeForPattern(TemporalGraph graph) {
// Query planning
TemporalQueryHandler queryHandler = getQueryHandler();
QueryPlan plan =
new GreedyPlanner<>(graph, queryHandler, graphStatistics, vertexStrategy, edgeStrategy).plan()
.getQueryPlan();
// Query execution
DataSet<Embedding> embeddings = plan.execute();
EmbeddingMetaData embeddingMetaData = plan.getRoot().getEmbeddingMetaData();
embeddings =
log(embeddings, new PrintEmbedding(embeddingMetaData),
getVertexMapping(), getEdgeMapping());
// Pattern construction (if necessary)
DataSet<Element> finalElements = this.constructionPattern != null ?
constructFinalElements(graph, embeddings, embeddingMetaData) :
embeddings.flatMap(
new ElementsFromEmbeddingTPGM<TemporalGraphHead, TemporalVertex, TemporalEdge>(
graph.getFactory().getGraphHeadFactory(),
graph.getFactory().getVertexFactory(),
graph.getFactory().getEdgeFactory(),
embeddingMetaData,
queryHandler.getSourceTargetVariables()));
// Post processing
TemporalGraphCollection graphCollection = doAttachData() ?
PostProcessor.extractGraphCollectionWithData(finalElements, graph, true) :
PostProcessor.extractGraphCollection(finalElements, graph.getCollectionFactory(), true);
return graphCollection;
}
@Override
protected TemporalGraphCollection emptyCollection(TemporalGraph graph) {
return graph.getCollectionFactory().createEmptyCollection();
}
/**
* Method to construct final embedded elements
*
* @param graph Used logical graph
* @param embeddings embeddings
* @param embeddingMetaData Meta information
* @return New set of EmbeddingElements
*/
private DataSet<Element> constructFinalElements(TemporalGraph graph, DataSet<Embedding> embeddings,
EmbeddingMetaData embeddingMetaData) {
TemporalQueryHandler constructionPatternHandler = null;
try {
// no postprocessing needed, construction pattern is not a query
constructionPatternHandler = new TemporalQueryHandler(
this.constructionPattern, new CNFPostProcessing(new ArrayList<>()));
// will never happen, as the construction pattern does not contain conditions
} catch (QueryContradictoryException e) {
e.printStackTrace();
}
Objects.requireNonNull(constructionPatternHandler).updateGeneratedVariableNames(n -> "_" + n);
Set<String> queryVars = Sets.newHashSet(embeddingMetaData.getVariables());
Set<String> constructionVars = constructionPatternHandler.getAllVariables();
Set<String> existingVars = intersection(queryVars, constructionVars).immutableCopy();
Set<String> newVars = difference(constructionVars, queryVars).immutableCopy();
EmbeddingMetaData newMetaData = computeNewMetaData(
embeddingMetaData, constructionPatternHandler, existingVars, newVars);
// project existing embedding elements to new embeddings
ProjectEmbeddingsElements projectedEmbeddings =
new ProjectEmbeddingsElements(embeddings, existingVars, embeddingMetaData, newMetaData);
// add new embedding elements
AddEmbeddingsElements addEmbeddingsElements =
new AddEmbeddingsElements(projectedEmbeddings.evaluate(), newVars.size());
return addEmbeddingsElements.evaluate().flatMap(
new ElementsFromEmbeddingTPGM<>(
graph.getFactory().getGraphHeadFactory(),
graph.getFactory().getVertexFactory(),
graph.getFactory().getEdgeFactory(),
newMetaData,
constructionPatternHandler.getSourceTargetVariables(),
constructionPatternHandler.getLabelsForVariables(newVars)));
}
/**
* Compute new meta information
*
* @param metaData old meta information
* @param returnPatternHandler pattern handler
* @param existingVariables old variables
* @param newVariables new variables
* @return new EmbeddingMetaData
*/
private EmbeddingMetaData computeNewMetaData(EmbeddingMetaData metaData,
TemporalQueryHandler returnPatternHandler,
Set<String> existingVariables, Set<String> newVariables) {
// update meta data
EmbeddingMetaData newMetaData = new EmbeddingMetaData();
// case 1: Filter existing embeddings based on return pattern
for (String var : existingVariables) {
newMetaData.setEntryColumn(var, metaData.getEntryType(var), newMetaData.getEntryCount());
}
// case 2: Add new vertices and edges
for (String var : newVariables) {
EmbeddingMetaData.EntryType type = returnPatternHandler.isEdge(var) ?
EmbeddingMetaData.EntryType.EDGE :
EmbeddingMetaData.EntryType.VERTEX;
newMetaData.setEntryColumn(var, type, newMetaData.getEntryCount());
}
return newMetaData;
}
}