TimeStampKeyFunction.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.keyedgrouping.keys;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.gradoop.common.model.impl.properties.PropertyValue;
import org.gradoop.flink.model.api.functions.KeyFunctionWithDefaultValue;
import org.gradoop.temporal.model.api.TimeDimension;
import org.gradoop.temporal.model.impl.pojo.TemporalElement;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.temporal.ChronoField;
import java.time.temporal.TemporalField;
import java.util.Objects;
import static java.time.ZoneOffset.UTC;
/**
* A key function extracting a time stamp of a {@link TemporalElement}.<p>
* An optional {@link TemporalField} parameter can be given to this key function. In this case the time stamp
* will be converted to a date (with a time zone set to {@link java.time.ZoneOffset#UTC UTC}) and the field
* (e. g. {@link ChronoField#MONTH_OF_YEAR month}) will be extracted from that date.<p>
* When no {@link TemporalField} is given, the time stamp will be used as is, in milliseconds since unix
* epoch.<p>
* The final grouping key will be stored on the super element as a property with key
* {@code time_INTERVAL_FIELD_CALCULATEDFIELD} where {@code INTERVAL} is the {@link TimeDimension},
* {@code FIELD} the {@link TimeDimension.Field} and {@code CALCULATEDFIELD} the {@link TemporalField}
* extracted from the element. When no {@link TemporalField} is given, the property will just be called
* {@code time_INTERVAL_FIELD}.<p>
* When the extracted {@link TimeDimension.Field field} of the {@link TimeDimension} is set to a default
* value and a {@link TemporalField} was set, a default value ({@code -1}) will be returned instead.
* In this case no property will be set.
*
* @param <T> The type of the elements to group.
*/
public class TimeStampKeyFunction<T extends TemporalElement> implements KeyFunctionWithDefaultValue<T, Long> {
/**
* The time dimension of the temporal element to extract.
*/
private final TimeDimension timeDimension;
/**
* The field of that dimension to extract.
*/
private final TimeDimension.Field timeDimensionField;
/**
* The time field to calculate.
*/
private final TemporalField fieldOfTimeStamp;
/**
* The property key used to store the grouping key on the super-element.
*/
private final String targetPropertyKey;
/**
* Create a new instance of this grouping key function.
*
* @param timeDimension The time dimension of the temporal element to consider.
* @param timeDimensionField The field of that time dimension to consider.
* @param fieldOfTimeStamp The time field of that field to calculate. May be {@code null},
* in that case nothing will be calculated, the time stamp will be
* returned as is.
*/
public TimeStampKeyFunction(TimeDimension timeDimension,
TimeDimension.Field timeDimensionField, TemporalField fieldOfTimeStamp) {
this.timeDimension = Objects.requireNonNull(timeDimension);
this.timeDimensionField = Objects.requireNonNull(timeDimensionField);
this.fieldOfTimeStamp = fieldOfTimeStamp;
this.targetPropertyKey = "time_" + timeDimension + "_" + timeDimensionField +
(fieldOfTimeStamp != null ? "_" + fieldOfTimeStamp : "");
}
@Override
public Long getKey(T element) {
final Tuple2<Long, Long> interval;
switch (timeDimension) {
case TRANSACTION_TIME:
interval = element.getTransactionTime();
break;
case VALID_TIME:
interval = element.getValidTime();
break;
default:
throw new UnsupportedOperationException(
"Time interval not supported by this element: " + timeDimension);
}
final Long fieldValue;
switch (timeDimensionField) {
case FROM:
fieldValue = interval.f0;
if (fieldValue.equals(TemporalElement.DEFAULT_TIME_FROM) && (fieldOfTimeStamp != null)) {
return getDefaultKey();
}
break;
case TO:
fieldValue = interval.f1;
if (fieldValue.equals(TemporalElement.DEFAULT_TIME_TO) && (fieldOfTimeStamp != null)) {
return getDefaultKey();
}
break;
default:
throw new UnsupportedOperationException("Field is not supported: " + timeDimensionField);
}
if (fieldOfTimeStamp == null) {
return fieldValue;
}
final LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(fieldValue), UTC);
return fieldOfTimeStamp.getFrom(date);
}
@Override
public void addKeyToElement(T element, Object key) {
if (!(key instanceof Long)) {
throw new IllegalArgumentException("Invalid type for key: " + key.getClass().getSimpleName());
}
// Do not set the key if field extraction is enabled and the key is -1
if (fieldOfTimeStamp == null || !getDefaultKey().equals(key)) {
element.setProperty(targetPropertyKey, PropertyValue.create(key));
}
}
@Override
public TypeInformation<Long> getType() {
return BasicTypeInfo.LONG_TYPE_INFO;
}
@Override
public Long getDefaultKey() {
if (fieldOfTimeStamp == null) {
switch (timeDimensionField) {
case FROM:
return TemporalElement.DEFAULT_TIME_FROM;
case TO:
return TemporalElement.DEFAULT_TIME_TO;
default:
throw new UnsupportedOperationException("Field not supported: " + timeDimensionField);
}
}
return -1L;
}
}