TransactionalPatternMatching.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.transactional;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.operators.UnaryBaseGraphCollectionToBaseGraphCollectionOperator;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.tuple.Project4To0And1;
import org.gradoop.flink.model.impl.functions.utils.LeftSide;
import org.gradoop.flink.model.impl.operators.matching.common.functions.MatchingEdges;
import org.gradoop.flink.model.impl.operators.matching.common.functions.MatchingVertices;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.IdWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.transactional.algorithm.PatternMatchingAlgorithm;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.ExpandFirstField;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.FindEmbeddings;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.HasEmbeddings;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.MergeSecondField;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.AddMatchesToProperties;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.BuildGraphWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.BuildIdWithCandidatesAndGraphs;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.BuildTripleWithCandidatesAndGraphs;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.InitGraphHeadWithLineage;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.Project4To0And2AndSwitch;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.Project4To0And3AndSwitch;
import org.gradoop.flink.model.impl.operators.matching.transactional.tuples.GraphWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.transactional.function.AddGraphsToElements;
/**
* Operator to match a given pattern on a graph collection
*
* @param <G> type of the graph head
* @param <V> the vertex type
* @param <E> the edge type
* @param <LG> type of the base graph instance
* @param <GC> type of the graph collection
*/
public class TransactionalPatternMatching<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>>
implements UnaryBaseGraphCollectionToBaseGraphCollectionOperator<GC> {
/**
* Query Pattern
*/
private String query;
/**
* Actual used algorithm
*/
private PatternMatchingAlgorithm algorithm;
/**
* Flag that determines what the result consists of:
* false: source graphs with an new property that is true iff the pattern
* was found in this graph
* true: one graph per found embedding
*/
private boolean findEmbeddings;
/**
* Constructor
*
* @param algorithm pattern matching algorithm
* @param query given query-pattern
* @param findEmbeddings flag that determines the type of the return
* false: source graphs with an new property that is
* true iff the pattern was found in this graph
* true: one graph per found embedding
*/
public TransactionalPatternMatching(
String query,
PatternMatchingAlgorithm algorithm,
boolean findEmbeddings) {
this.query = query;
this.algorithm = algorithm;
this.findEmbeddings = findEmbeddings;
}
@Override
public GC execute(GC collection) {
//--------------------------------------------------------------------------
// generate graph-id set witch will be used for generating mappings
//--------------------------------------------------------------------------
DataSet<GradoopId> graphIds = collection.getGraphHeads().map(new Id<>());
//--------------------------------------------------------------------------
// generate mapping from graph-id to vertex candidates
//--------------------------------------------------------------------------
DataSet<Tuple2<GradoopId, IdWithCandidates<GradoopId>>> vertexCandidatesWithGraphs =
collection.getVertices()
.filter(new MatchingVertices<>(query))
.map(new BuildIdWithCandidatesAndGraphs<>(query))
.flatMap(new ExpandFirstField<>())
.join(graphIds)
.where(0).equalTo("*")
.with(new LeftSide<>());
//--------------------------------------------------------------------------
// generate mapping from graph-id to edge candidates
//--------------------------------------------------------------------------
DataSet<Tuple2<GradoopId, TripleWithCandidates<GradoopId>>> edgeCandidatesWithGraphs =
collection.getEdges()
.filter(new MatchingEdges<>(query))
.map(new BuildTripleWithCandidatesAndGraphs<>(query))
.flatMap(new ExpandFirstField<>())
.join(graphIds)
.where(0).equalTo("*")
.with(new LeftSide<>());
//--------------------------------------------------------------------------
// generate graphs with the candidates for their elements
//--------------------------------------------------------------------------
DataSet<GraphWithCandidates> graphs = vertexCandidatesWithGraphs
.coGroup(edgeCandidatesWithGraphs)
.where(0).equalTo(0)
.with(new BuildGraphWithCandidates());
if (findEmbeddings) {
return findEmbeddings(collection, graphs);
} else {
return hasEmbeddings(collection, graphs);
}
}
/**
* Returns the input graph collection with a new property for each graph, that
* states if it contains the embedding.
* @param collection input graph collection
* @param graphs graphs with candidates of their elements
* @return input graph collection with new boolean property
*/
private GC hasEmbeddings(GC collection, DataSet<GraphWithCandidates> graphs) {
//--------------------------------------------------------------------------
// run matching algorithm
//--------------------------------------------------------------------------
DataSet<Tuple2<GradoopId, Boolean>> matches = graphs.map(new HasEmbeddings(algorithm, query));
//--------------------------------------------------------------------------
// join matches to graph heads
//--------------------------------------------------------------------------
DataSet<G> newHeads = collection.getGraphHeads()
.coGroup(matches)
.where(new Id<>()).equalTo(0)
.with(new AddMatchesToProperties<>());
//--------------------------------------------------------------------------
// return updated graph collection
//--------------------------------------------------------------------------
return collection.getFactory().fromDataSets(
newHeads, collection.getVertices(), collection.getEdges());
}
/**
* Finds all embeddings in the given graph and constructs a new graph
* collection consisting of these embeddings.
* @param collection input graph collection
* @param graphs graphs with candidates of their elements
* @return collection of found embeddings
*/
private GC findEmbeddings(GC collection, DataSet<GraphWithCandidates> graphs) {
//--------------------------------------------------------------------------
// run the matching algorithm
//--------------------------------------------------------------------------
DataSet<Tuple4<GradoopId, GradoopId, GradoopIdSet, GradoopIdSet>> embeddings = graphs
.flatMap(new FindEmbeddings(algorithm, query));
//--------------------------------------------------------------------------
// create new graph heads
//--------------------------------------------------------------------------
DataSet<G> newHeads = embeddings
.map(new Project4To0And1<>())
.map(new InitGraphHeadWithLineage<>(collection.getFactory().getGraphHeadFactory()));
//--------------------------------------------------------------------------
// update vertex graphs
//--------------------------------------------------------------------------
DataSet<Tuple2<GradoopId, GradoopIdSet>> verticesWithGraphs = embeddings
.map(new Project4To0And2AndSwitch<>())
.flatMap(new ExpandFirstField<>()).groupBy(0)
.reduceGroup(new MergeSecondField<>());
DataSet<V> newVertices = verticesWithGraphs
.join(collection.getVertices())
.where(0).equalTo(new Id<>())
.with(new AddGraphsToElements<>());
//--------------------------------------------------------------------------
// update edge graphs
//--------------------------------------------------------------------------
DataSet<Tuple2<GradoopId, GradoopIdSet>> edgesWithGraphs = embeddings
.map(new Project4To0And3AndSwitch<>())
.flatMap(new ExpandFirstField<>()).groupBy(0)
.reduceGroup(new MergeSecondField<>());
DataSet<E> newEdges = edgesWithGraphs
.join(collection.getEdges())
.where(0).equalTo(new Id<>())
.with(new AddGraphsToElements<>());
//--------------------------------------------------------------------------
// return the embeddings
//--------------------------------------------------------------------------
return collection.getFactory().fromDataSets(newHeads, newVertices, newEdges);
}
}