SetPairBulkTraverser.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.traverser;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.impl.functions.utils.Superstep;
import org.gradoop.flink.model.impl.operators.matching.common.MatchStrategy;
import org.gradoop.flink.model.impl.operators.matching.common.query.TraversalCode;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.Embedding;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.IdWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.common.tuples.TripleWithCandidates;
import org.gradoop.flink.model.impl.operators.matching.single.preserving.explorative.tuples.EmbeddingWithTiePoint;
/**
* Extracts {@link Embedding}s iteratively from a given graph by traversing the
* graph according to a given {@link TraversalCode}.
*
* For the iteration the traverser uses Flink bulk iteration.
*
* @param <K> key type
*/
public class SetPairBulkTraverser<K> extends SetPairTraverser<K> {
/**
* Creates a new distributed traverser.
*
* @param traversalCode describes the traversal through the graph
* @param vertexCount number of query vertices
* @param edgeCount number of query edges
* @param keyClazz needed for embedding initialization
*/
public SetPairBulkTraverser(TraversalCode traversalCode,
int vertexCount, int edgeCount, Class<K> keyClazz) {
this(traversalCode, MatchStrategy.ISOMORPHISM,
vertexCount, edgeCount,
keyClazz,
JoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES,
JoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES,
null, null); // debug mappings
}
/**
* Creates a new distributed traverser.
*
* @param traversalCode describes the graph traversal
* @param matchStrategy matching strategy for vertex and edge mapping
* @param vertexCount number of query vertices
* @param edgeCount number of query edges
* @param keyClazz key type for embedding initialization
* @param edgeStepJoinStrategy Join strategy for edge extension
* @param vertexStepJoinStrategy Join strategy for vertex extension
* @param vertexMapping used for debug
* @param edgeMapping used for debug
*/
public SetPairBulkTraverser(TraversalCode traversalCode, MatchStrategy matchStrategy,
int vertexCount, int edgeCount,
Class<K> keyClazz,
JoinOperatorBase.JoinHint edgeStepJoinStrategy,
JoinOperatorBase.JoinHint vertexStepJoinStrategy,
DataSet<Tuple2<K, PropertyValue>> vertexMapping,
DataSet<Tuple2<K, PropertyValue>> edgeMapping) {
super(traversalCode, matchStrategy, vertexCount, edgeCount, keyClazz,
edgeStepJoinStrategy, vertexStepJoinStrategy, vertexMapping, edgeMapping);
}
@Override
public DataSet<Tuple1<Embedding<K>>> traverse(
DataSet<IdWithCandidates<K>> vertices,
DataSet<TripleWithCandidates<K>> edges) {
return iterate(vertices, edges, buildInitialEmbeddings(vertices))
.project(1);
}
@Override
boolean isIterative() {
return true;
}
/**
* Explores the data graph iteratively using the provided traversal code.
*
* @param vertices vertex candidates
* @param edges edge candidates
* @param initialEmbeddings initial embeddings which are extended in each
* iteration
* @return final embeddings
*/
private DataSet<EmbeddingWithTiePoint<K>> iterate(
DataSet<IdWithCandidates<K>> vertices,
DataSet<TripleWithCandidates<K>> edges,
DataSet<EmbeddingWithTiePoint<K>> initialEmbeddings) {
// ITERATION HEAD
IterativeDataSet<EmbeddingWithTiePoint<K>> embeddings = initialEmbeddings
.iterate(getTraversalCode().getSteps().size());
// ITERATION BODY
// get current superstep
DataSet<Integer> superstep = embeddings
.first(1)
.map(new Superstep<>());
// traverse to outgoing/incoming edges
String[] forwardedFieldsEdgeSteps = new String[] {
"f0" // forward edge id
};
DataSet<EmbeddingWithTiePoint<K>> nextWorkSet = traverseEdges(edges,
embeddings, superstep, TraverserStrategy.SET_PAIR_BULK_ITERATION,
forwardedFieldsEdgeSteps);
// traverse to vertices
nextWorkSet = traverseVertices(vertices, nextWorkSet, superstep,
TraverserStrategy.SET_PAIR_BULK_ITERATION);
// ITERATION FOOTER
return embeddings.closeWith(nextWorkSet, nextWorkSet);
}
}