TemporalQueryHandler.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.matching.common.query;
import org.gradoop.flink.model.impl.operators.matching.common.query.QueryHandler;
import org.gradoop.flink.model.impl.operators.matching.common.query.predicates.CNF;
import org.gradoop.flink.model.impl.operators.matching.common.query.predicates.QueryPredicate;
import org.gradoop.gdl.GDLHandler;
import org.gradoop.gdl.model.Edge;
import org.gradoop.gdl.model.Vertex;
import org.gradoop.gdl.model.comparables.time.MaxTimePoint;
import org.gradoop.gdl.model.comparables.time.MinTimePoint;
import org.gradoop.gdl.model.comparables.time.TimeLiteral;
import org.gradoop.gdl.model.comparables.time.TimeSelector;
import org.gradoop.gdl.model.predicates.Predicate;
import org.gradoop.gdl.model.predicates.booleans.And;
import org.gradoop.gdl.model.predicates.expressions.Comparison;
import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.CNFPostProcessing;
import org.gradoop.temporal.model.impl.operators.matching.common.query.postprocessing.exceptions.QueryContradictoryException;
import org.gradoop.temporal.model.impl.operators.matching.common.query.predicates.ComparableTPGMFactory;
import java.util.HashMap;
import java.util.Map;
import static org.gradoop.gdl.utils.Comparator.LTE;
/**
* Wraps a {@link GDLHandler} and adds functionality needed for query
* processing during graph pattern matching.
* Extension of the flink QueryHandler for temporal queries
*/
public class TemporalQueryHandler extends QueryHandler {
/**
* Time Literal representing the system time at the start of query processing
*/
private final TimeLiteral now;
/**
* Transformations to apply to the query cnf
*/
private final CNFPostProcessing cnfPostProcessing;
/**
* The query CNF
*/
private CNF cnf;
/**
* Creates a new query handler that postprocesses the CNF with a default
* {@link CNFPostProcessing}
*
* @param gdlString GDL query string
* @throws QueryContradictoryException if a contradiction in the query is encountered
*/
public TemporalQueryHandler(String gdlString) throws QueryContradictoryException {
this(gdlString, new CNFPostProcessing());
}
/**
* Creates a new query handler.
*
* @param gdlString GDL query string
* @param postProcessing transformations to apply to the CNF
* @throws QueryContradictoryException if a contradiction in the query is encountered
*/
public TemporalQueryHandler(String gdlString, CNFPostProcessing postProcessing)
throws QueryContradictoryException {
super(gdlString);
this.cnfPostProcessing = postProcessing;
now = new TimeLiteral("now");
initCNF();
}
/**
* Returns all available predicates in Conjunctive Normal Form {@link CNF}. If there are no
* predicates defined in the query, a CNF containing zero predicates is returned.
*
* @return predicates
*/
@Override
public CNF getPredicates() {
return cnf;
}
/**
* Initiates the query CNF from the query, including postprocessing
*
* @throws QueryContradictoryException if query is found to be contradictory
*/
private void initCNF() throws QueryContradictoryException {
if (getGdlHandler().getPredicates().isPresent()) {
Predicate predicate = getGdlHandler().getPredicates().get();
// some transformations on the GDL query before conversion to CNF
predicate = preprocessPredicate(predicate);
CNF rawCNF = QueryPredicate.createFrom(predicate, new ComparableTPGMFactory(now)).asCNF();
cnf = cnfPostProcessing.postprocess(rawCNF);
} else {
// empty CNF
cnf = new CNF();
}
}
/**
* Pipeline for preprocessing GDL query predicates. Currently, only predicates for
* valid interval overlap of edges and their vertices are added.
*
* @param predicate the GDL predicate to preprocess
* @return preprocessed GDL predicate
*/
private Predicate preprocessPredicate(Predicate predicate) {
return addEdgeOverlapPredicates(predicate);
}
/**
* Augments a predicate by constraints ensuring that valid times of
* edges and their adjacent nodes overlap
*
* @param predicate the predicate to augment
* @return predicate augmented by edge overlaps constraints for every edge
*/
private Predicate addEdgeOverlapPredicates(Predicate predicate) {
GDLHandler handler = getGdlHandler();
Map<String, Edge> edgeCache = handler.getEdgeCache();
// maps vertex ids to vertex variables
Map<Long, String> vertexMap = new HashMap<>();
for (Vertex vertex : handler.getVertexCache(true, true).values()) {
vertexMap.put(vertex.getId(), vertex.getVariable());
}
// determine all (vertex, edge, vertex) triples
for (String edgeVar : handler.getEdgeCache().keySet()) {
Edge edge = edgeCache.get(edgeVar);
// no temporal predicates for paths
if (edge.hasVariableLength()) {
continue;
}
String sourceVar = vertexMap.get(edge.getSourceVertexId());
String targetVar = vertexMap.get(edge.getTargetVertexId());
// all relevant val-selectors
TimeSelector eFrom = new TimeSelector(edgeVar, TimeSelector.TimeField.VAL_FROM);
TimeSelector eTo = new TimeSelector(edgeVar, TimeSelector.TimeField.VAL_TO);
TimeSelector sFrom = new TimeSelector(sourceVar, TimeSelector.TimeField.VAL_FROM);
TimeSelector sTo = new TimeSelector(sourceVar, TimeSelector.TimeField.VAL_TO);
TimeSelector tFrom = new TimeSelector(targetVar, TimeSelector.TimeField.VAL_FROM);
TimeSelector tTo = new TimeSelector(targetVar, TimeSelector.TimeField.VAL_TO);
// create predicates edgeVar.val.overlaps(sourceVar.val) AND
// edgeVar.val.overlaps(targetVar.val)...
Predicate overlaps = new Comparison(
new MaxTimePoint(eFrom, sFrom, tFrom), LTE, new MinTimePoint(eTo, sTo, tTo)
);
// ...and add them to the resulting predicate
predicate = new And(predicate, overlaps);
}
return predicate;
}
/**
* Returns the TimeLiteral representing the systime at the start of query processing
*
* @return TimeLiteral representing the systime at the start of query processing
*/
public TimeLiteral getNow() {
return now;
}
/**
* Checks if the graph returns a single vertex and no edges (no loops).
*
* @return true, if single vertex graph
*/
public boolean isSingleVertexGraph() {
return getVertexCount() == 1 && getEdgeCount() == 0;
}
/**
* Returns the number of vertices in the query graph.
*
* @return vertex count
*/
public int getVertexCount() {
return getVertices().size();
}
/**
* Returns the number of edges in the query graph.
*
* @return edge count
*/
public int getEdgeCount() {
return getEdges().size();
}
}