ExplorativePatternMatching.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.GraphHeadFactory;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.api.entities.VertexFactory;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.epgm.BaseGraphFactory;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.VertexFromId;
import org.gradoop.flink.model.impl.functions.tuple.ObjectTo1;
import org.gradoop.flink.model.impl.functions.tuple.Value0Of2;
import org.gradoop.flink.model.impl.functions.tuple.Value1Of2;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.PostProcessor;
import org.gradoop.flink.model.impl.operators.matching.common.PreProcessor;
import org.gradoop.flink.model.impl.operators.matching.common.functions.AddGraphElementToNewGraph;
import org.gradoop.flink.model.impl.operators.matching.common.functions.ElementsFromEmbedding;
import org.gradoop.flink.model.impl.operators.matching.common.functions.MatchingVertices;
import org.gradoop.flink.model.impl.operators.matching.common.query.DFSTraverser;
import org.gradoop.flink.model.impl.operators.matching.common.query.TraversalCode;
import org.gradoop.flink.model.impl.operators.matching.common.query.Traverser;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.Embedding;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.IdWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.single.PatternMatching;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.SetPairBulkTraverser;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.SetPairForLoopTraverser;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.SetPairTraverser;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.TraverserStrategy;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.TripleForLoopTraverser;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser.TripleTraverser;
import java.util.Objects;
import static org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES;
/**
* Algorithm detects subgraphs by traversing the search graph according to a
* given traversal code which is derived from the query pattern.
*
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The graph type.
* @param <GC> The graph collection type.
*/
public class ExplorativePatternMatching<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> extends PatternMatching<G, V, E, LG, GC> {
/**
* Name for broadcast set which contains the superstep id.
*/
public static final String BC_SUPERSTEP = "bc_superstep";
/**
* Logger
*/
private static final Logger LOG = LogManager.getLogger(ExplorativePatternMatching.class);
/**
* Holds information on how to traverse the graph.
*/
private final Traverser traverser;
/**
* Join strategy used for the join between embeddings and edges
*/
private final JoinOperatorBase.JoinHint edgeStepJoinStrategy;
/**
* Join strategy used for the join between embeddings and vertices
*/
private final JoinOperatorBase.JoinHint vertexStepJoinStrategy;
/**
* Strategy for vertex and edge mappings
*/
private final MatchStrategy matchStrategy;
/**
* Strategy iterating the graph
*/
private final TraverserStrategy traverserStrategy;
/**
* Create new operator instance
*
* @param query GDL query graph
* @param attachData true, if original data shall be attached to the result
* @param matchStrategy match strategy for vertex and edge mappings
* @param traverserStrategy iteration strategy for distributed traversal
* @param traverser Traverser used for the query graph
* @param edgeStepJoinStrategy Join strategy for edge extension
* @param vertexStepJoinStrategy Join strategy for vertex extension
*/
private ExplorativePatternMatching(String query, boolean attachData,
MatchStrategy matchStrategy,
TraverserStrategy traverserStrategy,
Traverser traverser,
JoinOperatorBase.JoinHint edgeStepJoinStrategy,
JoinOperatorBase.JoinHint vertexStepJoinStrategy) {
super(query, attachData, LOG);
this.matchStrategy = matchStrategy;
this.traverserStrategy = traverserStrategy;
this.traverser = traverser;
this.traverser.setQueryHandler(getQueryHandler());
this.edgeStepJoinStrategy = edgeStepJoinStrategy;
this.vertexStepJoinStrategy = vertexStepJoinStrategy;
}
@Override
protected GC executeForVertex(LG graph) {
BaseGraphFactory<G, V, E, LG, GC> graphFactory = graph.getFactory();
GraphHeadFactory<G> graphHeadFactory = graphFactory.getGraphHeadFactory();
VertexFactory<V> vertexFactory = graphFactory.getVertexFactory();
String variable = getQueryHandler().getVertices().iterator().next().getVariable();
DataSet<V> matchingVertices = graph.getVertices()
.filter(new MatchingVertices<>(getQuery()));
if (!doAttachData()) {
matchingVertices = matchingVertices
.map(new Id<>())
.map(new ObjectTo1<>())
.map(new VertexFromId<>(vertexFactory));
}
DataSet<Tuple2<V, G>> pairs = matchingVertices
.map(new AddGraphElementToNewGraph<>(graphHeadFactory, variable))
.returns(new TupleTypeInfo<>(
TypeExtractor.getForClass(vertexFactory.getType()),
TypeExtractor.getForClass(graphHeadFactory.getType())));
return graph.getCollectionFactory().fromDataSets(
pairs.map(new Value1Of2<>()),
pairs.map(new Value0Of2<>()));
}
@Override
protected GC executeForPattern(LG graph) {
TraversalCode traversalCode = traverser.traverse();
DataSet<Tuple1<Embedding<GradoopId>>> embeddings;
if (traverserStrategy == TraverserStrategy.SET_PAIR_BULK_ITERATION ||
traverserStrategy == TraverserStrategy.SET_PAIR_FOR_LOOP_ITERATION) {
//--------------------------------------------------------------------------
// Pre-processing (filter candidates)
//--------------------------------------------------------------------------
DataSet<IdWithCandidates<GradoopId>> vertices = PreProcessor.filterVertices(
graph, getQuery());
DataSet<TripleWithCandidates<GradoopId>> edges = PreProcessor.filterEdges(
graph, getQuery());
//--------------------------------------------------------------------------
// Exploration via Traversal
//--------------------------------------------------------------------------
SetPairTraverser<GradoopId> distributedTraverser;
if (traverserStrategy == TraverserStrategy.SET_PAIR_BULK_ITERATION) {
distributedTraverser = new SetPairBulkTraverser<>(traversalCode, matchStrategy,
traverser.getQueryHandler().getVertexCount(), traverser.getQueryHandler().getEdgeCount(),
GradoopId.class, edgeStepJoinStrategy, vertexStepJoinStrategy, getVertexMapping(),
getEdgeMapping());
} else {
distributedTraverser = new SetPairForLoopTraverser<>(traversalCode, matchStrategy,
traverser.getQueryHandler().getVertexCount(), traverser.getQueryHandler().getEdgeCount(),
GradoopId.class, edgeStepJoinStrategy, vertexStepJoinStrategy, getVertexMapping(),
getEdgeMapping());
}
embeddings = distributedTraverser.traverse(vertices, edges);
} else if (traverserStrategy == TraverserStrategy.TRIPLES_FOR_LOOP_ITERATION) {
DataSet<TripleWithCandidates<GradoopId>> triples = PreProcessor
.filterTriplets(graph, getQuery());
TripleTraverser<GradoopId> distributedTraverser = new TripleForLoopTraverser<>(
traversalCode, matchStrategy,
traverser.getQueryHandler().getVertexCount(),
traverser.getQueryHandler().getEdgeCount(),
GradoopId.class, edgeStepJoinStrategy, getVertexMapping(), getEdgeMapping());
embeddings = distributedTraverser.traverse(triples);
} else {
throw new IllegalArgumentException("Unsupported traverser strategy: " + traverserStrategy);
}
//--------------------------------------------------------------------------
// Post-Processing (build Graph Collection from embeddings)
//--------------------------------------------------------------------------
DataSet<Element> elements = embeddings
.flatMap(new ElementsFromEmbedding(traversalCode,
graph.getFactory().getGraphHeadFactory(),
graph.getFactory().getVertexFactory(),
graph.getFactory().getEdgeFactory(),
getQueryHandler()
));
return doAttachData() ?
PostProcessor.extractGraphCollectionWithData(elements, graph, true) :
PostProcessor.extractGraphCollection(elements, graph.getCollectionFactory(), true);
}
/**
* Used for configuring and creating a new {@link ExplorativePatternMatching} operator instance.
*/
public static final class Builder {
/**
* GDL query string
*/
private String query;
/**
* Attach original vertex and edge data
*/
private boolean attachData;
/**
* Matching strategy for vertex and edge mappings
*/
private MatchStrategy matchStrategy;
/**
* Iteration strategy for traversing the graph
*/
private TraverserStrategy traverserStrategy;
/**
* Provides a traversal description for the distributed traverser
*/
private Traverser traverser;
/**
* Join strategy for edge extensions during traversal
*/
private JoinOperatorBase.JoinHint edgeStepJoinStrategy;
/**
* Join strategy for vertex extensions during traversal
*/
private JoinOperatorBase.JoinHint vertexStepJoinStrategy;
/**
* Creates a new builder instance
*/
public Builder() {
this.attachData = false;
this.matchStrategy = MatchStrategy.ISOMORPHISM;
this.traverserStrategy = TraverserStrategy.SET_PAIR_BULK_ITERATION;
this.traverser = new DFSTraverser();
this.edgeStepJoinStrategy = OPTIMIZER_CHOOSES;
this.vertexStepJoinStrategy = OPTIMIZER_CHOOSES;
}
/**
* Set the GDL query string. e.g. {@code (a)-->(b)}
*
* @param query GDL query
* @return modified builder
*/
public Builder setQuery(String query) {
this.query = query;
return this;
}
/**
* Set if the original vertex and edge data shall be attached to the result.
*
* @param attachData true, iff data shall be attached
* @return modified builder
*/
public Builder setAttachData(boolean attachData) {
this.attachData = attachData;
return this;
}
/**
* Set matching strategy for vertex and edge embeddings (e.g. isomorphism).
*
* @param matchStrategy matching strategy
* @return modified builder
*/
public Builder setMatchStrategy(MatchStrategy matchStrategy) {
this.matchStrategy = matchStrategy;
return this;
}
/**
* Set iteration strategy for traversing the graph (e.g. bulk traversal).
*
* @param traverserStrategy iteration strategy
* @return modified builder
*/
public Builder setTraverserStrategy(TraverserStrategy traverserStrategy) {
this.traverserStrategy = traverserStrategy;
return this;
}
/**
* Sets the traverser to describe the distributed graph traversal.
*
* @param traverser traverser for the query graph
* @return modified builder
*/
public Builder setTraverser(Traverser traverser) {
this.traverser = traverser;
return this;
}
/**
* Sets the join strategy for joining edges during traversal.
*
* @param edgeStepJoinStrategy join strategy
* @return modified builder
*/
public Builder setEdgeStepJoinStrategy(
JoinOperatorBase.JoinHint edgeStepJoinStrategy) {
this.edgeStepJoinStrategy = edgeStepJoinStrategy;
return this;
}
/**
* Sets the join strategy for joining vertices during traversal.
*
* @param vertexStepJoinStrategy join strategy
* @return modified builder
*/
public Builder setVertexStepJoinStrategy(
JoinOperatorBase.JoinHint vertexStepJoinStrategy) {
this.vertexStepJoinStrategy = vertexStepJoinStrategy;
return this;
}
/**
* Instantiates a new {@link ExplorativePatternMatching} operator.
*
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The graph type.
* @param <GC> The graph collection type.
* @return operator instance
*/
public <G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> ExplorativePatternMatching<G, V, E, LG, GC> build() {
Objects.requireNonNull(query, "Missing GDL query");
Objects.requireNonNull(matchStrategy, "Missing match strategy");
Objects.requireNonNull(traverserStrategy, "Missing iteration strategy");
Objects.requireNonNull(traverser, "Missing traverser");
Objects.requireNonNull(edgeStepJoinStrategy, "Missing join strategy");
Objects.requireNonNull(vertexStepJoinStrategy, "Missing join strategy");
return new ExplorativePatternMatching<>(query, attachData, matchStrategy, traverserStrategy, traverser,
edgeStepJoinStrategy, vertexStepJoinStrategy);
}
}
}