AccumuloQueryHolder.java

/*
 * Copyright © 2014 - 2021 Leipzig University (Database Research Group)
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.gradoop.storage.accumulo.impl.predicate.query;

import org.apache.accumulo.core.data.Range;
import org.gradoop.common.model.api.entities.Element;
import org.gradoop.common.model.impl.id.GradoopId;
import org.gradoop.common.model.impl.id.GradoopIdSet;
import org.gradoop.storage.accumulo.impl.predicate.filter.api.AccumuloElementFilter;
import org.gradoop.storage.accumulo.utils.KryoUtils;
import org.gradoop.storage.common.predicate.query.ElementQuery;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

/**
 * Accumulo predicate filter definition, this is a internal model, should not be used outside
 *
 * @param <T> element type
 */
public class AccumuloQueryHolder<T extends Element> implements Serializable {

  /**
   * Query ranges in accumulo table, should be serializable
   */
  private final byte[] queryRanges;

  /**
   * Reduce filter for element
   */
  private final AccumuloElementFilter<T> reduceFilter;

  /**
   * Accumulo predicate instance, low level api for store implement
   *
   * @param logicalRanges accumulo logical ranges for element table,
   * @param reduceFilter query reduce filter
   *                     only those in predicate should be return from tserver.
   *                     if null, return all in range
   */
  private AccumuloQueryHolder(
    @Nullable List<Range> logicalRanges,
    @Nullable AccumuloElementFilter<T> reduceFilter
  ) {
    RangeWrapper wrapper = new RangeWrapper();
    wrapper.ranges = logicalRanges;
    this.queryRanges = wrapper.encrypt();
    this.reduceFilter = reduceFilter;
  }

  /**
   * Create a predicate within a certain id ranges
   *
   * @param query element query
   * @param <T>   element type
   * @return accumulo predicate
   */
  public static <T extends Element> AccumuloQueryHolder<T> create(
    @Nonnull ElementQuery<AccumuloElementFilter<T>> query
  ) {
    List<Range> ranges = Range.mergeOverlapping(Optional.ofNullable(query.getQueryRanges())
      .orElse(GradoopIdSet.fromExisting())
      .stream()
      .map(GradoopId::toString)
      .map(Range::exact)
      .collect(Collectors.toList()));
    return new AccumuloQueryHolder<>(
      query.getQueryRanges() == null ? null : ranges,
      query.getFilterPredicate());
  }

  /**
   * Create a predicate within a certain accumulo id ranges
   *
   * @param idRanges      gradoop row-id ranges for query element
   * @param reduceFilter  reducer filter logic
   * @param <T>           element type
   * @return accumulo predicate
   */
  public static <T extends Element> AccumuloQueryHolder<T> create(
    @Nonnull List<Range> idRanges,
    @Nullable AccumuloElementFilter<T> reduceFilter) {
    if (idRanges.isEmpty()) {
      throw new IllegalArgumentException("id range is empty");
    }
    return new AccumuloQueryHolder<>(idRanges, reduceFilter);
  }

  /**
   * Get query ranges by anti-encrypt wrapper
   *
   * @return seek range
   */
  public List<Range> getQueryRanges() {
    return queryRanges == null ? null : RangeWrapper.decrypt(queryRanges).ranges;
  }

  /**
   * Get reduce filter
   *
   * @return accumulo element filter
   */
  public AccumuloElementFilter<T> getReduceFilter() {
    return reduceFilter;
  }

  @Override
  public String toString() {
    List<String> ranges = getQueryRanges() == null ? null :
      getQueryRanges().stream()
        .map(it -> it == null ? null : String.format("%s:%s",
          it.getStartKey().getRow(),
          it.getEndKey().getRow()))
        .collect(Collectors.toList());
    return String.format("range=%1$s, filter=%2$s", ranges, getReduceFilter());
  }

  /**
   * Range wrapper definition, just for request transport
   */
  private static class RangeWrapper {

    /**
     * Query ranges, may be null
     */
    private List<Range> ranges;

    /**
     * Encrypt as byte array
     *
     * @return byte array result
     */
    private byte[] encrypt() {
      try {
        return KryoUtils.dumps(this);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }

    /**
     * Decrypted from byte array
     *
     * @param data encrypted data
     * @return range wrapper instance
     */
    private static RangeWrapper decrypt(byte[] data) {
      try {
        return KryoUtils.loads(data, RangeWrapper.class);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
    }
  }

}