AggregatorWrapperWithValue.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.AggregateDefaultValue;
import org.gradoop.flink.model.api.functions.AggregateFunction;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
/**
* Wraps an aggregate function where the internal representation of a {@link PropertyValue} is a {@link List}
* containing some property value and the original aggregate value.
* The additional value is used to identify aggregate values used by this function.
*/
abstract class AggregatorWrapperWithValue implements AggregateFunction, AggregateDefaultValue {
/**
* The actual aggregate function.
*/
protected final AggregateFunction wrappedFunction;
/**
* The additional value stored with the property value.
* This value is used to identify values to be aggregated.
*/
private final PropertyValue identifyingValue;
/**
* Create a new instance of this wrapper.
*
* @param wrappedFunction The wrapped aggregate function.
* @param identifyingValue The additional value used to identify values to be aggregated.
*/
AggregatorWrapperWithValue(AggregateFunction wrappedFunction, PropertyValue identifyingValue) {
this.wrappedFunction = Objects.requireNonNull(wrappedFunction);
this.identifyingValue = Objects.requireNonNull(identifyingValue);
}
/**
* Unwrap a {@link PropertyValue} wrapped by this class.
*
* @param wrappedValue The wrapped property value.
* @return The unwrapped (raw) property value.
* @see #wrap(PropertyValue)
*/
protected PropertyValue unwrap(PropertyValue wrappedValue) {
if (wrappedValue.isNull()) {
return wrappedValue;
}
return wrappedValue.getList().get(1);
}
/**
* Wrap a {@link PropertyValue} into an internal representation used by this class.
*
* @param rawValue The raw property value.
* @return The wrapped property value.
* @see #unwrap(PropertyValue)
*/
protected PropertyValue wrap(PropertyValue rawValue) {
return PropertyValue.create(Arrays.asList(identifyingValue, rawValue));
}
/**
* Check if a {@link PropertyValue} should be considered by this function.
*
* @param value The property value.
* @return {@code true}, if the value should be aggregated by this function.
*/
protected boolean isAggregated(PropertyValue value) {
if (!value.isList()) {
return false;
}
final List<PropertyValue> values = value.getList();
if (values.size() != 2) {
return false;
}
return identifyingValue.equals(values.get(0));
}
@Override
public PropertyValue aggregate(PropertyValue aggregate, PropertyValue increment) {
if (isAggregated(increment)) {
return wrap(wrappedFunction.aggregate(unwrap(aggregate), unwrap(increment)));
} else {
return aggregate;
}
}
@Override
public String getAggregatePropertyKey() {
return wrappedFunction.getAggregatePropertyKey();
}
@Override
public PropertyValue postAggregate(PropertyValue result) {
if (isAggregated(result)) {
return wrappedFunction.postAggregate(unwrap(result));
} else {
return null;
}
}
@Override
public PropertyValue getDefaultValue() {
return wrappedFunction instanceof AggregateDefaultValue ?
((AggregateDefaultValue) wrappedFunction).getDefaultValue() : PropertyValue.NULL_VALUE;
}
}