They gave me this test class that fails at the 58th minute of every hour and for the next two minutes.
@Test
public void testMultipleTimeSeriesMovingAverage() throws Exception {
table.clear();
long ts = System.currentTimeMillis();
List<Tag> tags1 = new ArrayList<>();
tags1.add(new Tag("host", "r01n01"));
List<Tag> tags2 = new ArrayList<>();
tags2.add(new Tag("host", "r01n02"));
for (int i = 0; i < 100; i++) {
ts += 1000;
Metric m = new Metric("sys.cpu.user", ts, i * 1.0D, tags1);
byte[] row = MetricAdapter.encodeRowKey(m);
Key k = new Key(row, tags1.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v = new Value(MetricAdapter.encodeValue(m.getValue().getMeasure()));
table.put(k, v);
Metric m2 = new Metric("sys.cpu.user", ts, i * 2.0D, tags2);
byte[] row2 = MetricAdapter.encodeRowKey(m2);
Key k2 = new Key(row2, tags2.get(0).join().getBytes(StandardCharsets.UTF_8),
MetricAdapter.encodeColQual(ts, ""), new byte[0], ts);
Value v2 = new Value(MetricAdapter.encodeValue(m2.getValue().getMeasure()));
table.put(k2, v2);
}
SortedMapIterator source = new SortedMapIterator(table);
TimeSeriesGroupingIterator iter = new TimeSeriesGroupingIterator();
IteratorSetting settings = new IteratorSetting(100, TimeSeriesGroupingIterator.class);
settings.addOption(TimeSeriesGroupingIterator.FILTER, "0.20,0.20,0.20,0.20,0.20");
iter.init(source, settings.getOptions(), SCAN_IE);
iter.seek(new Range(), EMPTY_COL_FAMS, true);
// this section changed when the key structure changed so that identical
// colFam values sorted consecutively within an given time period
for (int i = 4; i < 100; i++) {
checkNextResult(iter, new double[] { i - 4, i - 3, i - 2, i - 1, i });
}
for (int i = 4; i < 100; i++) {
checkNextResult(iter, new double[] { (i - 4) * 2, (i - 3) * 2, (i - 2) * 2, (i - 1) * 2, i * 2 });
}
assertFalse(iter.hasTop());
}
private void checkNextResult(TimeSeriesGroupingIterator iter, double[] expectedValues) throws IOException {
assertTrue(iter.hasTop());
LOG.trace("Expected: {}", expectedValues);
LOG.trace("Getting value for Key {}", iter.getTopKey());
double expected = expectedMovingAverage(expectedValues);
assertEquals(expected, MetricAdapter.decodeValue(iter.getTopValue().get()), 0.0D);
iter.next();
}
This is the class that goes to test. Unfortunately, I was unable to fully understand the code that is tested by the method.
public class TimeSeriesGroupingIterator extends WrappingIterator {
/**
* Object representing a single time series and its computed value. The value
* will be computed when the time series its values reach the target size.
* Calling getAndRemoveAnswer will return the answer and clear it's internal
* representation.
*
*/
private static class TimeSeries extends LinkedList<Pair<Key, Double>> implements Serializable {
private static final long serialVersionUID = 1L;
private int targetSize;
private transient TimeSeriesGroupingIterator iter;
private Double answer;
public TimeSeries(TimeSeriesGroupingIterator iter, int size) {
this.iter = iter;
this.targetSize = size;
}
public void add(Key key, Double value) {
super.add(new Pair<>(key, value));
if (super.size() == this.targetSize) {
recompute();
// remove first key as it is no longer needed for any
// computations
Pair<Key, Double> e = this.pollFirst();
LOG.trace("Removing first entry {}", e.getFirst());
}
}
public Double getAndRemoveAnswer() {
try {
return this.answer;
} finally {
this.answer = null;
}
}
private void recompute() {
this.answer = iter.compute(this);
LOG.trace("recomputed, new answer: {}", this.answer);
}
}
/**
*
* Object representing a group of time series, where uniqueness is defined by
* the name of the metric and a unique tag set.
*
*/
private static class TimeSeriesGroup extends HashMap<Metric, TimeSeries> implements Iterable<Pair<Key, Double>> {
private static final long serialVersionUID = 1L;
private transient List<Pair<Key, Double>> answers;
public TimeSeriesGroup() {
this.answers = new ArrayList<>();
}
@Override
public TimeSeries put(Metric key, TimeSeries value) {
TimeSeries old = super.put(key, value);
Double answer = value.getAndRemoveAnswer();
if (answer != null) {
answers.add(new Pair<>(value.getLast().getFirst(), answer));
}
return old;
}
/**
* Iterates over the time series group and returns for each time series the last
* key and the filtered value.
*/
@Override
public Iterator<Pair<Key, Double>> iterator() {
try {
return answers.iterator();
} finally {
answers = new ArrayList<>();
}
}
}
private static final Logger LOG = LoggerFactory.getLogger(TimeSeriesGroupingIterator.class);
public static final String FILTER = "sliding.window.filter";
private TimeSeriesGroup series = new TimeSeriesGroup();
protected Double[] filters = null;
private Key topKey = null;
private Value topValue = null;
private Iterator<Pair<Key, Double>> seriesIterator = null;
protected Double compute(List<Pair<Key, Double>> values) {
double result = 0D;
int i = 0;
for (Pair<Key, Double> e : values) {
LOG.trace("compute - key:{}, value: {}", e.getFirst(), e.getSecond());
result += (filters[i] * e.getSecond());
i++;
}
LOG.trace("compute - result: {}", result);
return result;
}
@Override
public Key getTopKey() {
return topKey;
}
@Override
public Value getTopValue() {
return topValue;
}
@Override
public boolean hasTop() {
LOG.trace("hasTop()");
return (null != topKey && null != topValue);
}
private void setTopKeyValue() {
while (!seriesIterator.hasNext() && super.hasTop()) {
try {
refillBuffer();
seriesIterator = series.iterator();
} catch (IOException e) {
throw new RuntimeException("Error filling buffer", e);
}
}
if (seriesIterator.hasNext()) {
Pair<Key, Double> p = seriesIterator.next();
topKey = p.getFirst();
topValue = new Value(MetricAdapter.encodeValue(p.getSecond()));
} else {
topKey = null;
topValue = null;
}
}
@Override
public void next() throws IOException {
LOG.trace("next()");
setTopKeyValue();
}
@Override
public void init(SortedKeyValueIterator<Key, Value> source, Map<String, String> options, IteratorEnvironment env)
throws IOException {
super.init(source, options, env);
String filterOption = options.getOrDefault(FILTER, null);
if (null == filterOption) {
throw new IllegalArgumentException("Window size must be specified.");
}
String[] split = StringUtils.split(filterOption, ',');
filters = new Double[split.length];
for (int i = 0; i < split.length; i++) {
filters[i] = Double.parseDouble(split[i]);
}
LOG.trace("init - filter: {}", Arrays.toString(filters));
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException {
super.seek(range, columnFamilies, inclusive);
series.clear();
seriesIterator = series.iterator();
setTopKeyValue();
}
/**
* This will parse the next set of keys with the same timestamp (encoded in the
* row) from the underlying source.
*
* @throws IOException
*/
private void refillBuffer() throws IOException {
LOG.trace("refill()");
Long time = null;
while (super.hasTop()) {
Key k = super.getTopKey();
Long newTime = k.getTimestamp();
if (time == null) {
time = newTime;
}
if (time.equals(newTime)) {
try {
Metric m = MetricAdapter.parse(k, super.getTopValue());
timely.model.Value v = m.getValue();
m.setValue(null);
TimeSeries values = series.get(m);
if (null == values) {
LOG.trace("Creating new time series {}", m);
values = new TimeSeries(this, filters.length);
}
LOG.trace("Adding value {} to series {}", v.getMeasure(), m);
values.add(k, v.getMeasure());
// always re-put the metric back into the TimeSeriesGroup as
// it
// maintains a list of prepared answers
series.put(m, values);
} catch (Exception e) {
LOG.error("Error: {} parsing metric at key: {}", e.getMessage(), k.toString());
}
super.next();
} else {
break;
}
}
LOG.trace("Buffer contents: {}", series);
}
}
To run the test I use the command
mvn -Dtest=nameClass#nameMethod test
This is a graph of the test failure on 1000 iterations. Does anyone know what this failure depends on?
Aucun commentaire:
Enregistrer un commentaire