RollUp.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.rollup;
import org.apache.flink.api.java.DataSet;
import org.gradoop.common.model.impl.pojo.EPGMEdge;
import org.gradoop.common.model.impl.pojo.EPGMGraphHead;
import org.gradoop.common.model.impl.pojo.EPGMVertex;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.operators.UnaryGraphToCollectionOperator;
import org.gradoop.flink.model.impl.epgm.GraphCollection;
import org.gradoop.flink.model.impl.epgm.LogicalGraph;
import org.gradoop.flink.model.impl.functions.epgm.SetProperty;
import org.gradoop.flink.model.impl.operators.grouping.GroupingStrategy;
import java.util.ArrayList;
import java.util.List;
/**
* The rollUp operator generates all combinations of the supplied vertex or edge grouping keys
* according to the definition of the rollUp operation in SQL and uses them together with all
* opposed grouping keys for separate grouping operations. For example, specifying the grouping
* keys A, B and C leads to three differently grouped graphs {A,B,C},{A,B},{A} within the resulting
* graph collection. The grouping can be applied using the vertex or edge grouping keys depending on
* the implementations of the used sub class.
*/
public abstract class RollUp implements UnaryGraphToCollectionOperator {
/**
* Stores grouping keys for vertices.
*/
protected final List<String> vertexGroupingKeys;
/**
* Stores aggregation functions for vertices.
*/
protected final List<AggregateFunction> vertexAggregateFunctions;
/**
* Stores grouping keys for edges.
*/
protected final List<String> edgeGroupingKeys;
/**
* Stores aggregation functions for edges.
*/
protected final List<AggregateFunction> edgeAggregateFunctions;
/**
* Stores the strategy used for grouping.
*/
protected GroupingStrategy strategy;
/**
* Creates a rollUp operator instance with {@link GroupingStrategy#GROUP_REDUCE} as grouping
* strategy. Use {@link RollUp#setGroupingStrategy(GroupingStrategy)} to define a different
* grouping strategy.
*
* @param vertexGroupingKeys grouping keys to group vertices
* @param vertexAggregateFunctions aggregate functions to apply on super vertices
* @param edgeGroupingKeys grouping keys to group edges
* @param edgeAggregateFunctions aggregate functions to apply on super edges
*/
RollUp(
List<String> vertexGroupingKeys,
List<AggregateFunction> vertexAggregateFunctions,
List<String> edgeGroupingKeys,
List<AggregateFunction> edgeAggregateFunctions) {
this.vertexGroupingKeys = vertexGroupingKeys;
this.vertexAggregateFunctions = vertexAggregateFunctions;
this.edgeGroupingKeys = edgeGroupingKeys;
this.edgeAggregateFunctions = edgeAggregateFunctions;
this.strategy = GroupingStrategy.GROUP_REDUCE;
}
/**
* Applies the rollUp operation on the given input graph.
*
* @param graph input graph
* @return graphCollection containing all differently grouped graphs
*/
@Override
public GraphCollection execute(LogicalGraph graph) {
DataSet<EPGMGraphHead> graphHeads = null;
DataSet<EPGMVertex> vertices = null;
DataSet<EPGMEdge> edges = null;
List<List<String>> groupingKeyCombinations = getGroupingKeyCombinations();
// for each permutation execute a grouping
for (List<String> combination : groupingKeyCombinations) {
// apply the grouping
LogicalGraph groupedGraph = applyGrouping(graph, combination);
// add a property to the grouped graph's head to specify the used keys
PropertyValue groupingKeys = PropertyValue.create(String.join(",", combination));
DataSet<EPGMGraphHead> newGraphHead =
groupedGraph.getGraphHead().map(new SetProperty<>(getGraphPropertyKey(), groupingKeys));
if (graphHeads != null && vertices != null && edges != null) {
// in later iterations union the datasets of the grouped elements with the existing ones
graphHeads = graphHeads.union(newGraphHead);
vertices = vertices.union(groupedGraph.getVertices());
edges = edges.union(groupedGraph.getEdges());
} else {
// in the first iteration, fill the datasets
graphHeads = newGraphHead;
vertices = groupedGraph.getVertices();
edges = groupedGraph.getEdges();
}
}
// We initialized the DataSets with null, so it may be possible that they're still null here,
// so we should check and return an empty collection in this case.
// But the overhead of creating an empty collection should only be done, if at least one of the
// DataSets is null.
GraphCollection collection;
if (graphHeads != null && vertices != null && edges != null) {
collection = graph.getCollectionFactory()
.fromDataSets(graphHeads, vertices, edges);
} else {
collection = graph.getCollectionFactory().createEmptyCollection();
}
return collection;
}
/**
* Creates all combinations of the supplied grouping keys.
*
* @param groupingKeys list of all grouping keys to be combined
* @return list containing all combinations of grouping keys
*/
List<List<String>> createGroupingKeyCombinations(List<String> groupingKeys) {
List<List<String>> combinations = new ArrayList<>();
int elements = groupingKeys.size();
while (elements > 0) {
combinations.add(new ArrayList<>(groupingKeys.subList(0, elements)));
elements--;
}
return combinations;
}
/**
* Set the grouping strategy that will be used for each grouping.
* {@link GroupingStrategy#GROUP_REDUCE} is used as default.
*
* @param strategy the strategy to use
*/
public void setGroupingStrategy(GroupingStrategy strategy) {
this.strategy = strategy;
}
/**
* Get the property key that is added to each graph head of the grouped graphs inside the
* resulting collection to specify which property keys are used to group the graph.
*
* @return the property key as string
*/
abstract String getGraphPropertyKey();
/**
* Apply the groupBy-operator to the given logical graph and use the given grouping keys as vertex
* or edge grouping keys (depends on the child class).
*
* @param graph the graph the group-By operator is applied on
* @param groupingKeys the vertex or edge grouping keys to use
* @return the grouped graph
*/
abstract LogicalGraph applyGrouping(LogicalGraph graph, List<String> groupingKeys);
/**
* Returns all vertex or edge grouping key combinations as list. Internally the
* {@link RollUp#createGroupingKeyCombinations(List)} function is used to create the combinations.
* The child class decides, whether the vertex or edge keys are used.
*
* @return a list of all vertex or edge grouping key combinations used for rollup grouping
*/
abstract List<List<String>> getGroupingKeyCombinations();
}