KeyedGroupingUtils.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping;
import org.gradoop.common.model.api.entities.Edge;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.api.entities.GraphHead;
import org.gradoop.common.model.api.entities.Vertex;
import org.gradoop.flink.model.api.epgm.BaseGraph;
import org.gradoop.flink.model.api.epgm.BaseGraphCollection;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import org.gradoop.flink.model.api.functions.KeyFunction;
import org.gradoop.flink.model.api.functions.KeyFunctionWithDefaultValue;
import org.gradoop.flink.model.impl.operators.grouping.Grouping;
import org.gradoop.flink.model.impl.operators.grouping.tuples.LabelGroup;
import org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific.LabelSpecificAggregatorWrapper;
import org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific.UnlabeledGroupAggregatorWrapper;
import org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific.LabelSpecificKeyFunction;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* Utilities for the {@link KeyedGrouping} implementation.<p>
* This class provides functions used to convert arguments of the {@link Grouping} implementation to
* key functions and aggregate functions.
*/
public final class KeyedGroupingUtils {
/**
* No instances of this class are needed.
*/
private KeyedGroupingUtils() {
}
/**
* Extract aggregate functions from a list of label groups.
*
* @param labelGroups A list of label groups.
* @return A list of aggregate functions defined in those label groups.
*/
public static List<AggregateFunction> asAggregateFunctions(List<LabelGroup> labelGroups) {
LabelGroup defaultGroup = getDefaultGroupOrNull(labelGroups);
List<AggregateFunction> functions;
if (defaultGroup == null) {
functions = new ArrayList<>();
AtomicInteger id = new AtomicInteger(0);
Set<String> allLabels = labelGroups.stream().map(LabelGroup::getGroupingLabel)
.collect(Collectors.toSet());
for (LabelGroup labelGroup : labelGroups) {
final String currentLabel = labelGroup.getGroupingLabel();
if (currentLabel.equals(Grouping.DEFAULT_VERTEX_LABEL_GROUP) ||
currentLabel.equals(Grouping.DEFAULT_EDGE_LABEL_GROUP)) {
labelGroup.getAggregateFunctions().forEach(lga -> functions.add(
new UnlabeledGroupAggregatorWrapper(allLabels, lga, (short) id.getAndIncrement())));
} else {
labelGroup.getAggregateFunctions().forEach(lga -> functions.add(
new LabelSpecificAggregatorWrapper(currentLabel, lga, (short) id.getAndIncrement())));
}
}
} else {
functions = defaultGroup.getAggregateFunctions();
}
return functions;
}
/**
* Convert label groups to key functions.
*
* @param useLabels Flag used to indicate whether labels of elements should be used as grouping keys.
* @param labelGroups The label groups to convert.
* @param <T> The element type for the key function.
* @return Key functions corresponding to those groups.
*/
public static <T extends Element> List<KeyFunction<T, ?>> asKeyFunctions(
boolean useLabels, List<LabelGroup> labelGroups) {
LabelGroup defaultGroup = getDefaultGroupOrNull(labelGroups);
List<KeyFunction<T, ?>> newKeys = new ArrayList<>();
if (defaultGroup == null) {
newKeys.add(asKeyFunction(useLabels, labelGroups));
} else {
defaultGroup.getPropertyKeys().forEach(k -> newKeys.add(GroupingKeys.property(k)));
if (useLabels) {
newKeys.add(GroupingKeys.label());
}
}
return newKeys;
}
/**
* Create a label-specific key function from a list of label groups.
*
* @param useLabels Should labels be used for grouping?
* @param labelGroups The label groups to convert.
* @param <T> The element type for the key function.
* @return The label-specific key function.
*/
public static <T extends Element> LabelSpecificKeyFunction<T> asKeyFunction(
boolean useLabels, List<LabelGroup> labelGroups) {
Map<String, List<KeyFunctionWithDefaultValue<T, ?>>> keyFunctions = new HashMap<>();
for (LabelGroup labelGroup : labelGroups) {
final String groupingLabel = labelGroup.getGroupingLabel();
if (keyFunctions.containsKey(groupingLabel)) {
throw new UnsupportedOperationException("Duplicate grouping label: " + groupingLabel);
}
List<KeyFunctionWithDefaultValue<T, ?>> keysForLabel = new ArrayList<>();
if ((groupingLabel.equals(Grouping.DEFAULT_VERTEX_LABEL_GROUP) ||
groupingLabel.equals(Grouping.DEFAULT_EDGE_LABEL_GROUP)) && useLabels) {
keysForLabel.add(GroupingKeys.label());
}
labelGroup.getPropertyKeys().forEach(k -> keysForLabel.add(GroupingKeys.property(k)));
keyFunctions.put(groupingLabel, keysForLabel);
}
Map<String, String> labelUpdateMap = labelGroups.stream()
.collect(Collectors.toMap(LabelGroup::getGroupingLabel, LabelGroup::getGroupLabel));
return new LabelSpecificKeyFunction<>(keyFunctions, labelUpdateMap);
}
/**
* Create a new instance of the {@link KeyedGrouping} operator from a list of vertex and edge label groups.
* This factory method also accepts global aggregators which will be applied to each label group.
*
* @param useVertexLabels Group by vertex label.
* @param useEdgeLabels Group by edge label.
* @param vertexLabelGroups The vertex label groups.
* @param edgeLabelGroups The edge label groups.
* @param globalVertexAggregators A list of aggregate functions applicable for all vertex groups.
* @param globalEdgeAggregators A list of aggregate functions applicable for all edge groups.
* @param <G> The graph head type.
* @param <V> The vertex type.
* @param <E> The edge type.
* @param <LG> The graph type.
* @param <GC> The graph collection type.
* @return A new instance of the grouping operator.
*/
public static <
G extends GraphHead,
V extends Vertex,
E extends Edge,
LG extends BaseGraph<G, V, E, LG, GC>,
GC extends BaseGraphCollection<G, V, E, LG, GC>> KeyedGrouping<G, V, E, LG, GC> createInstance(
boolean useVertexLabels, boolean useEdgeLabels,
List<LabelGroup> vertexLabelGroups, List<LabelGroup> edgeLabelGroups,
List<AggregateFunction> globalVertexAggregators, List<AggregateFunction> globalEdgeAggregators) {
List<AggregateFunction> vertexAggregators = asAggregateFunctions(vertexLabelGroups);
vertexAggregators.addAll(globalVertexAggregators);
List<AggregateFunction> edgeAggregators = asAggregateFunctions(edgeLabelGroups);
edgeAggregators.addAll(globalEdgeAggregators);
return new KeyedGrouping<>(
asKeyFunctions(useVertexLabels, vertexLabelGroups), vertexAggregators,
asKeyFunctions(useEdgeLabels, edgeLabelGroups), edgeAggregators);
}
/**
* Get the default label group or return {@code null} if other label groups exist in the list.<p>
* This is used internally to check for label-specific grouping.
*
* @param labelGroups A list of label groups.
* @return The default label group, if it is the only label group and {@code null} otherwise
*/
private static LabelGroup getDefaultGroupOrNull(List<LabelGroup> labelGroups) {
if (labelGroups.size() != 1) {
return null;
} else {
LabelGroup labelGroup = labelGroups.get(0);
if (!(labelGroup.getGroupingLabel().equals(Grouping.DEFAULT_EDGE_LABEL_GROUP) ||
labelGroup.getGroupingLabel().equals(Grouping.DEFAULT_VERTEX_LABEL_GROUP))) {
return null;
}
return labelGroup;
}
}
}