TimeIntervalKeyFunction.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.keyedgrouping.keys;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.gradoop.flink.model.api.functions.KeyFunctionWithDefaultValue;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.pojo.TemporalElement;
import java.util.Objects;
/**
* A key function extracting a value of a {@link TimeDimension} from a {@link TemporalElement}.
*
* @param <T> The type of the temporal elements.
*/
public class TimeIntervalKeyFunction<T extends TemporalElement>
implements KeyFunctionWithDefaultValue<T, Tuple2<Long, Long>> {
/**
* The time dimension to extract from.
*/
private final TimeDimension timeDimension;
/**
* Create a new instance of this key function.
*
* @param timeDimension The time dimension to extract.
*/
public TimeIntervalKeyFunction(TimeDimension timeDimension) {
this.timeDimension = Objects.requireNonNull(timeDimension);
}
@Override
public Tuple2<Long, Long> getKey(T element) {
switch (timeDimension) {
case VALID_TIME:
return element.getValidTime();
case TRANSACTION_TIME:
return element.getTransactionTime();
default:
throw new UnsupportedOperationException("Time dimension not supported by this element: " +
timeDimension);
}
}
@Override
public Tuple2<Long, Long> getDefaultKey() {
return new Tuple2<>(Long.MIN_VALUE, Long.MAX_VALUE);
}
@Override
public void addKeyToElement(T element, Object key) {
if (key instanceof Tuple2) {
final Object firstElement = ((Tuple2) key).f0;
final Object secondElement = ((Tuple2) key).f1;
if ((firstElement instanceof Long) && (secondElement instanceof Long)) {
switch (timeDimension) {
case VALID_TIME:
element.setValidFrom((Long) firstElement);
element.setValidTo((Long) secondElement);
break;
case TRANSACTION_TIME:
element.setTxFrom((Long) firstElement);
element.setTxTo((Long) secondElement);
break;
default:
}
} else {
throw new IllegalArgumentException("Invalid types for tuple key: " +
firstElement.getClass().getSimpleName() + ", " + secondElement.getClass().getSimpleName());
}
} else {
throw new IllegalArgumentException("Invalid type for key: " + key.getClass().getSimpleName());
}
}
@Override
public TypeInformation<Tuple2<Long, Long>> getType() {
return TupleTypeInfo.getBasicTupleTypeInfo(Long.TYPE, Long.TYPE);
}
@Override
public String toString() {
return timeDimension.toString();
}
}