BaseCalculateDegrees.java
/*
* Copyright © 2014 - 2021 Leipzig University (Database Research Group)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.gradoop.temporal.model.impl.operators.metric.functions;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
import org.gradoop.common.model.impl.id.GradoopId;
import java.util.Map;
import java.util.TreeMap;
/**
* Base class for calculating the degree evolution by the given degree tree.
*/
public abstract class BaseCalculateDegrees {
/**
* Exception message string.
*/
private static final String TEMPORAL_VIOLATION_MSG = "Last timestamp [%d] is not smaller than the " +
"current [%d] for vertex with id [%s]. A chronological order of timestamps is mandatory. Please " +
"check the temporal integrity of your graph in the given time domain. The operator " +
"TemporalGraph#updateEdgeValidity() can be used to update an edges validity to ensure its integrity.";
/**
* Function calculates and collects the degree evolution for a given vertex and degree tree. It uses the
* from and to times of the vertex. If these times are not known, default timestamps can be used.
*
* @param vertexId the vertex id
* @param degreeTree the degree tree of that vertex
* @param vertexFromTime the from time of that vertex
* @param vertexToTime the to time of that vertex
* @param collector the collector that collects the resulting degree tuples
*/
protected void calculateDegreeAndCollect(GradoopId vertexId, TreeMap<Long, Integer> degreeTree,
Long vertexFromTime, Long vertexToTime, Collector<Tuple4<GradoopId, Long, Long, Integer>> collector) {
// we store for each timestamp the current degree
int degree = 0;
// first degree 0 is from t_from(v) to the first occurrence of a start timestamp
Long lastTimestamp = vertexFromTime;
for (Map.Entry<Long, Integer> entry : degreeTree.entrySet()) {
// check integrity
if (lastTimestamp > entry.getKey()) {
// This should not happen, seems that a temporal constraint is violated
throw new IllegalArgumentException(String.format(TEMPORAL_VIOLATION_MSG, lastTimestamp,
entry.getKey(), vertexId));
}
if (lastTimestamp.equals(entry.getKey())) {
// First timestamp in tree is equal to the lower interval bound of the vertex
degree += entry.getValue();
continue;
}
// The payload is 0, means the degree does not change and the intervals can be merged
if (entry.getValue() != 0) {
collector.collect(new Tuple4<>(vertexId, lastTimestamp, entry.getKey(), degree));
degree += entry.getValue();
// remember the last timestamp since it is the first one of the next interval
lastTimestamp = entry.getKey();
}
}
// last degree is 0 from last occurence of timestamp to t_to(v)
if (lastTimestamp < vertexToTime) {
collector.collect(new Tuple4<>(vertexId, lastTimestamp, vertexToTime, degree));
} else if (lastTimestamp > vertexToTime) {
// This should not happen, seems that a temporal constraint is violated
throw new IllegalArgumentException(String.format(TEMPORAL_VIOLATION_MSG, lastTimestamp, vertexToTime,
vertexId));
} // else, the ending bound of the vertex interval equals the last timestamp of the edges
}
}