FilterAndProjectTriple.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.cypher.operators.filter.functions;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.query.predicates.CNF;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Embedding;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingFactory;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.EmbeddingMetaData;
import org.gradoop.flink.model.impl.operators.matching.single.cypher.pojos.Triple;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* Applies a given predicate on a {@link Triple} and projects specified property values to the
* output embedding.
*/
public class FilterAndProjectTriple extends RichFlatMapFunction<Triple, Embedding> {
/**
* Predicates used for filtering
*/
private final CNF predicates;
/**
* variable of the source vertex
*/
private final String sourceVariable;
/**
* variable of the target vertex
*/
private final String targetVariable;
/**
* Property keys used for value projection of the source vertex
*/
private final List<String> sourceProjectionPropertyKeys;
/**
* Property keys used for value projection of the edge
*/
private final List<String> edgeProjectionPropertyKeys;
/**
* Property keys used for value projection of the target vertex
*/
private final List<String> targetProjectionPropertyKeys;
/**
* Meta data describing the vertex embedding used for filtering
*/
private final EmbeddingMetaData filterMetaData;
/**
* Source vertex propertyKeys of the embedding used for filtering
*/
private final List<String> sourceFilterPropertyKeys;
/**
* Edge propertyKeys of the embedding used for filtering
*/
private final List<String> edgeFilterPropertyKeys;
/**
* Target vertex propertyKeys of the embedding used for filtering
*/
private final List<String> targetFilterPropertyKeys;
/**
* True if vertex and target variable are the same
*/
private final boolean isLoop;
/**
* Set to true if vertex matching strategy is isomorphism
*/
private final boolean isVertexIso;
/**
* New FilterAndProjectTriples
* @param sourceVariable the source variable
* @param edgeVariable edge variabe
* @param targetVariable target variable
* @param predicates filter predicates
* @param projectionPropertyKeys property keys used for projection
* @param vertexMatchStrategy vertex match strategy
*/
public FilterAndProjectTriple(String sourceVariable, String edgeVariable, String targetVariable,
CNF predicates, Map<String, List<String>> projectionPropertyKeys,
MatchStrategy vertexMatchStrategy) {
this.predicates = predicates;
this.sourceVariable = sourceVariable;
this.targetVariable = targetVariable;
this.sourceProjectionPropertyKeys =
projectionPropertyKeys.getOrDefault(sourceVariable, new ArrayList<>());
this.edgeProjectionPropertyKeys =
projectionPropertyKeys.getOrDefault(edgeVariable, new ArrayList<>());
this.targetProjectionPropertyKeys =
projectionPropertyKeys.getOrDefault(targetVariable, new ArrayList<>());
this.isLoop = sourceVariable.equals(targetVariable);
this.isVertexIso = vertexMatchStrategy.equals(MatchStrategy.ISOMORPHISM);
filterMetaData = createFilterMetaData(predicates, sourceVariable, edgeVariable, targetVariable);
sourceFilterPropertyKeys = filterMetaData.getPropertyKeys(sourceVariable);
edgeFilterPropertyKeys = filterMetaData.getPropertyKeys(edgeVariable);
targetFilterPropertyKeys = filterMetaData.getPropertyKeys(targetVariable);
}
@Override
public void flatMap(Triple triple, Collector<Embedding> out) throws Exception {
boolean isValid = true;
if (isLoop) {
if (!(triple.getSourceId().equals(triple.getTargetId()))) {
isValid = false;
}
} else if (isVertexIso && triple.getSourceId().equals(triple.getTargetId())) {
isValid = false;
}
if (isValid && filter(triple)) {
out.collect(
EmbeddingFactory.fromTriple(
triple,
sourceProjectionPropertyKeys, edgeProjectionPropertyKeys, targetProjectionPropertyKeys,
sourceVariable, targetVariable
)
);
}
}
/**
* Checks if the the triple holds for the predicate
* @param triple triple to be filtered
* @return True if the triple holds for the predicate
*/
private boolean filter(Triple triple) {
return predicates.evaluate(
EmbeddingFactory.fromTriple(triple,
sourceFilterPropertyKeys, edgeFilterPropertyKeys, targetFilterPropertyKeys,
sourceVariable, targetVariable
),
filterMetaData
);
}
/**
* Creates the {@code EmbeddingMetaData} of the embedding used for filtering
* @param predicates filter predicates
* @param sourceVariable source variable
* @param edgeVariable edge variable
* @param targetVariable target variable
* @return filter embedding meta data
*/
private static EmbeddingMetaData createFilterMetaData(CNF predicates, String sourceVariable,
String edgeVariable, String targetVariable) {
EmbeddingMetaData metaData = new EmbeddingMetaData();
metaData.setEntryColumn(sourceVariable, EmbeddingMetaData.EntryType.VERTEX, 0);
metaData.setEntryColumn(edgeVariable, EmbeddingMetaData.EntryType.EDGE, 1);
metaData.setEntryColumn(targetVariable, EmbeddingMetaData.EntryType.VERTEX, 2);
int i = 0;
for (String variable : new String[] {sourceVariable, edgeVariable, targetVariable}) {
for (String propertyKey : predicates.getPropertyKeys(variable)) {
metaData.setPropertyColumn(variable, propertyKey, i++);
}
}
return metaData;
}
}