ApplySubgraph.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.subgraph;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.operators.ApplicableUnaryBaseGraphToBaseGraphOperator;
import org.gradoop.flink.model.impl.functions.epgm.Id;
import org.gradoop.flink.model.impl.functions.epgm.SourceId;
import org.gradoop.flink.model.impl.functions.epgm.TargetId;
import org.gradoop.flink.model.impl.operators.subgraph.functions.EdgeToSourceAndTargetIdWithGraphIds;
import org.gradoop.flink.model.impl.functions.utils.RightSideWithLeftGraphs;
import org.gradoop.flink.model.impl.operators.subgraph.functions.RightSideWithLeftGraphs2To1;
import org.gradoop.flink.model.impl.operators.verify.Verify;
/**
* Extracts a subgraph from each base graph in a graph collection using
* the given filter functions. The graph head stays unchanged for the subgraph.
* <p>
* The operator is able to:
* <ol>
* <li>extract vertex-induced subgraph</li>
* <li>extract edge-induced subgraph via join + union strategy</li>
* <li>extract edge-induced subgraph via project + union + join strategy</li>
* <li>extract subgraph based on vertex and edge filter function without verification
* (no joins, use {@link Verify} to verify the subgraph)</li>
* </ol>
*
* @param <G> type of the graph head
* @param <V> the vertex type
* @param <E> the edge type
* @param <LG> type of the logical graph instance
* @param <GC> type of the graph collection
*/
public class ApplySubgraph<
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> extends Subgraph<G, V, E, LG, GC>
implements ApplicableUnaryBaseGraphToBaseGraphOperator<GC> {
/**
* Creates a new sub graph operator instance.
* <p>
* If both parameters are not {@code null}, the operator returns the subgraph
* defined by filtered vertices and edges.
* <p>
* If the {@code edgeFilterFunction} is {@code null}, the operator returns the vertex-induced subgraph.
* <p>
* If the {@code vertexFilterFunction} is {@code null}, the operator returns the edge-induced subgraph.
*
* @param vertexFilterFunction vertex filter function
* @param edgeFilterFunction edge filter function
* @param strategy sets the execution strategy for the operator
*/
public ApplySubgraph(FilterFunction<V> vertexFilterFunction,
FilterFunction<E> edgeFilterFunction, Strategy strategy) {
super(vertexFilterFunction, edgeFilterFunction, strategy);
}
@Override
public GC executeForGVELayout(GC collection) {
GC result;
switch (strategy) {
case BOTH:
result = subgraph(collection);
break;
case VERTEX_INDUCED:
result = vertexInducedSubgraph(collection);
break;
case EDGE_INDUCED:
result = edgeInducedSubgraph(collection);
break;
case EDGE_INDUCED_PROJECT_FIRST:
result = edgeInducedSubgraphProjectFirst(collection);
break;
default:
throw new IllegalArgumentException("Strategy " + strategy + " is not supported");
}
return result;
}
@Override
public GC executeForTxLayout(GC collection) {
return executeForGVELayout(collection);
}
/**
* Returns one subgraph for each of the given super graphs.
* The subgraphs are defined by the vertices that fulfil the vertex filter function.
*
* @param collection collection of supergraphs
* @return collection of vertex-induced subgraphs
*/
private GC vertexInducedSubgraph(GC collection) {
DataSet<V> filteredVertices = collection.getVertices().filter(vertexFilterFunction);
return collection.getFactory()
.fromDataSets(collection.getGraphHeads(), filteredVertices, collection.getEdges())
.verify();
}
/**
* Returns one subgraph for each of the given supergraphs.
* The subgraphs are defined by the edges that fulfil the vertex filter function.
*
* @param collection collection of supergraphs
* @return collection of edge-induced subgraphs
*/
private GC edgeInducedSubgraph(GC collection) {
DataSet<E> filteredEdges = collection.getEdges().filter(edgeFilterFunction);
DataSet<V> inducedVertices = filteredEdges
.join(collection.getVertices())
.where(new SourceId<>()).equalTo(new Id<>())
.with(new RightSideWithLeftGraphs<>())
.union(filteredEdges
.join(collection.getVertices())
.where(new TargetId<>()).equalTo(new Id<>())
.with(new RightSideWithLeftGraphs<>()))
.distinct(new Id<>());
return collection.getFactory()
.fromDataSets(collection.getGraphHeads(), inducedVertices, filteredEdges);
}
/**
* Returns one subgraph for each of the given supergraphs.
* The subgraphs are defined by the edges that fulfil the vertex filter function.
*
* @param collection collection of supergraph
* @return collection of edge-induced subgraph
*/
private GC edgeInducedSubgraphProjectFirst(GC collection) {
DataSet<E> filteredEdges = collection.getEdges().filter(edgeFilterFunction);
DataSet<V> inducedVertices = filteredEdges
.flatMap(new EdgeToSourceAndTargetIdWithGraphIds<>())
.distinct(0)
.join(collection.getVertices())
.where(0).equalTo(new Id<>())
.with(new RightSideWithLeftGraphs2To1<>());
return collection.getFactory()
.fromDataSets(collection.getGraphHeads(), inducedVertices, filteredEdges);
}
/**
* Returns one subgraph for each of the given supergraphs.
* The subgraphs are defined by the vertices that fulfil the vertex filter
* function and edges that fulfill the edge filter function.
* <p>
* Note, that the operator does not verify the consistency of the resulting graph.
*
* @param collection collection of supergraphs
* @return collection of subgraphs
*/
private GC subgraph(GC collection) {
DataSet<V> newVertices = collection.getVertices().filter(vertexFilterFunction);
DataSet<E> newEdges = collection.getEdges().filter(edgeFilterFunction);
return collection.getFactory()
.fromDataSets(collection.getGraphHeads(), newVertices, newEdges);
}
}