AggregatorWrapperWithValue.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.flink.model.impl.operators.keyedgrouping.labelspecific;

import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.AggregateDefaultValue;
import org.gradoop.flink.model.api.functions.AggregateFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Objects;

/**
 * Wraps an aggregate function where the internal representation of a {@link PropertyValue} is a {@link List}
 * containing some property value and the original aggregate value.
 * The additional value is used to identify aggregate values used by this function.
 */
abstract class AggregatorWrapperWithValue implements AggregateFunction, AggregateDefaultValue {

  /**
   * The actual aggregate function.
   */
  protected final AggregateFunction wrappedFunction;

  /**
   * The additional value stored with the property value.
   * This value is used to identify values to be aggregated.
   */
  private final PropertyValue identifyingValue;

  /**
   * Create a new instance of this wrapper.
   *
   * @param wrappedFunction The wrapped aggregate function.
   * @param identifyingValue The additional value used to identify values to be aggregated.
   */
  AggregatorWrapperWithValue(AggregateFunction wrappedFunction, PropertyValue identifyingValue) {
    this.wrappedFunction = Objects.requireNonNull(wrappedFunction);
    this.identifyingValue = Objects.requireNonNull(identifyingValue);
  }

  /**
     * Unwrap a {@link PropertyValue} wrapped by this class.
     *
     * @param wrappedValue The wrapped property value.
     * @return The unwrapped (raw) property value.
     * @see #wrap(PropertyValue)
     */
  protected PropertyValue unwrap(PropertyValue wrappedValue) {
    if (wrappedValue.isNull()) {
      return wrappedValue;
    }
    return wrappedValue.getList().get(1);
  }

  /**
     * Wrap a {@link PropertyValue} into an internal representation used by this class.
     *
     * @param rawValue The raw property value.
     * @return The wrapped property value.
     * @see #unwrap(PropertyValue)
     */
  protected PropertyValue wrap(PropertyValue rawValue) {
    return PropertyValue.create(Arrays.asList(identifyingValue, rawValue));
  }

  /**
     * Check if a {@link PropertyValue} should be considered by this function.
     *
     * @param value The property value.
     * @return {@code true}, if the value should be aggregated by this function.
     */
  protected boolean isAggregated(PropertyValue value) {
    if (!value.isList()) {
      return false;
    }
    final List<PropertyValue> values = value.getList();
    if (values.size() != 2) {
      return false;
    }
    return identifyingValue.equals(values.get(0));
  }

  @Override
  public PropertyValue aggregate(PropertyValue aggregate, PropertyValue increment) {
    if (isAggregated(increment)) {
      return wrap(wrappedFunction.aggregate(unwrap(aggregate), unwrap(increment)));
    } else {
      return aggregate;
    }
  }

  @Override
  public String getAggregatePropertyKey() {
    return wrappedFunction.getAggregatePropertyKey();
  }

  @Override
  public PropertyValue postAggregate(PropertyValue result) {
    if (isAggregated(result)) {
      return wrappedFunction.postAggregate(unwrap(result));
    } else {
      return null;
    }
  }

  @Override
  public PropertyValue getDefaultValue() {
    return wrappedFunction instanceof AggregateDefaultValue ?
      ((AggregateDefaultValue) wrappedFunction).getDefaultValue() : PropertyValue.NULL_VALUE;
  }
}