Оптимизация функций вычисления min и max на Hadoop

Для сбора и анализа статистики функционирования LG Cloud было решено использовать Hadoop, который позволяет оперировать гигантскими объёмами данных и гибко масштабировать систему. Философия Hadoop — приближение кода к данным. Заключается она в том, что данные распределяются по кластеру и обработка их по возможности происходит на тех же нодах, где данные находятся физически (принцип data locality). Так как размер кода гораздо меньше размера данных, общая производительнось оказывается более высокой по сравнению с «традиционным» кластером.

Интерфейсом к данным в Hadoop в нашем случае служит Hive. Задачи (jobs) в Hadoop представлены как Mappers and Reducers, следуя модели MapReduce. Для непосредственной реализации алгоритмов MapReduce был выбран python модуль Dumbo.

Анализ статистики заключается в обработке текстовых данных — логов с различных серверов. Mapper, в общем случае, выбирает нужные значения из строчек лог-файла, а Reducer применяет нужную функцию ко множеству этих значений.

В процессе работы возникла задача нахождения минимума и максимума периодов времени. Исходные данные были в виде списка целочисленных значений unix time, полученных от Mapper. Для того, чтобы из входного списка сделать список интервалов между соседними элементами, нужно читать исходный список парами и находить разницу между значениями в Reducer. Причем индексы списка для таких пар должны быть вида (n, n+1), при n от 0 до m-1, где m – длина списка.

Учитывая высокую нагруженность системы и ограниченные ресурсы, хотелось найти наилучший вариант вычисления минимума, максимума и суммы. Для этого был написан тестовый модуль сравнивающий built-in max, min, sum, и несколько самописных функций. И был использован отличный инструмент — модуль itertoos стандартной библиотеки, который предоставляет быстрые, эффективно использующие память, алгоритмы итераторов.

Начальная реализация нашего списка парами со смещением выглядела как генераторная функция.

15    def pairs1(data):
16        iterator = iter(data)
17        item = iterator.next() # throws StopIteration if empty.
18        for next in iterator:
19            yield (item,next)
20            item = next

Байткод для такой функции выглядит так

Disassembly of pairs:
16    0 LOAD_GLOBAL 0 (iter)
      3 LOAD_FAST 0 (data)
      6 CALL_FUNCTION 1 # Вызываем iter
      9 STORE_FAST 1 (iterator) # Сохраняем объект итератора в iterator
17    12 LOAD_FAST 1 (iterator)
      15 LOAD_ATTR 1 (next) # Ищем в словаре метод next
      18 CALL_FUNCTION 0
      21 STORE_FAST 2 (item)
      24 SETUP_LOOP 31 (to 58)
18    27 LOAD_FAST 1 (iterator)
      30 GET_ITER
   >> 31 FOR_ITER 23 (to 57)
      34 STORE_FAST 3 (next)
19    37 LOAD_FAST 2 (item)
      40 LOAD_FAST 3 (next)
      43 BUILD_TUPLE 2 # Создание tuple
      46 YIELD_VALUE
      47 POP_TOP
20    48 LOAD_FAST 3 (next)
      51 STORE_FAST 2 (item)
      54 JUMP_ABSOLUTE 31
   >> 57 POP_BLOCK
   >> 58 LOAD_CONST 0 (None)
      61 RETURN_VALUE

Интересно было насколько хорош будет itertools для нашей задачи. Как результат — ещё одна функция с тем же функционалом

 8    from itertools import izip, islice
 9    def pairs2(data):
10        return izip(data, islice(data, 1, None))

В нашем окружении установлен Python 2.7.3, в котором реализация itertools.izip() и zip() одинакова. Это было сделано при разработке Python 3.0a в 2008 году Move itertools izip() code to builtins as zip(). Импорт izip() сделан только для наглядности.

Таким образом, переложили полностью на itertools такие операции, фактически реализованные на C, как BUILD_TUPLE, GET_ITER, и избавляясь от поиска атрибута в объекте LOAD_ATTR.

>>> dis.dis(pairs1)
 9    0 LOAD_GLOBAL 0 (izip)
      3 LOAD_FAST 0 (data)
      6 LOAD_GLOBAL 1 (islice)
      9 LOAD_FAST 0 (data)
      12 LOAD_CONST 1 (1)
      15 LOAD_CONST 0 (None)
      18 CALL_FUNCTION 3
      21 CALL_FUNCTION 2
      24 RETURN_VALUE

Несмотря на то, что байткод короче, мы лишь сделали вычисления более «ленивыми», и далее будем проверять на практике, какой метод быстрее.

Результаты тестирования создания пар:

>>> t = range(999999)
>>> %timeit list(pairs1(t))
10 loops, best of 3: 177 ms per loop
>>> %timeit list(pairs2(t))
10 loops, best of 3: 130 ms per loop

Разница в скорости между алгоритмами — десятки миллисекунд на списке в миллион целочисленных значений. И всё-таки itertools быстрее собственной генераторной функции. Вопрос, в чем причина и как влияет создание объекта generator на скорость, возможно будет рассмотрен в следующих статьях.

Для проверки скорости реализации пар списка было написано три функции. Первая, самая очевидная:

def min_max_simple(function, values):
deltas = [b - a for a, b in function(values)]
min_span = min(deltas)
max_span = max(deltas)
total_span = sum(deltas)
average_span = total_span / float(len(deltas))
return min_span, max_span, total_span, average_span

Несмотря на то, что в первом подходе используется built-in, оптимизированные для большинства случаев функции, написанные на Си, было подозрительно четыре раза проходить по deltas. Поэтому реализовали второй алгоритм, идея которого в расчёте за один проход.

def min_max_custom(function, values):
deltas = [b - a for a, b in function(values)]
min_span = max_span = total_span = length = 0
for v in deltas:
    length += 1
    total_span += v
    if min_span > v:
        min_span = v
    elif max_span < v:
        max_span = v
average_span = total_span / float(length)
return min_span, max_span, total_span, average_span

Ещё один способ, показанный здесь http://code.activestate.com/recipes/577916-fast-minmax-function/, показался интересным. Путём сокращением количества операций сравнения, повышалась производительность поиска максимума-минимума. Однако, это эффективно только для «дорогих» операций сравнения. В обсуждении утверждается, что данный подход хорошо работает и для дешёвых операции. Мы проверим это.

def min_max_expensive_cmp(function, values):
    """ Inspired by http://code.activestate.com/recipes/577916-fast-minmax-function/ """
    deltas = [b - a for a, b in function(values)]
    min_span = max_span = 0
    for a, b in izip(deltas, islice(deltas, 1, None)):
        if b < a:
            a, b = b, a
        if min_span > a:
            min_span = a
        if max_span < b:
            max_span = b
    total_span = sum(deltas)
    average_span = total_span / float(len(deltas))
    return min_span, max_span, total_span, average_span

Сравним производительность функций поиска минимума и максимума на самом быстром алгоритме создания пар, предварительно объединив в один модуль исследуемые функции. Медленную генераторную функцию для пар сравнивать не будем, так как очевидно — создание списка будет медленнее.

# coding: utf8
from itertools import izip, islice, tee
n = 999999
t = range(n)

def pairs1(data):
    return izip(data, islice(data, 1, None))

def min_max_simple(function, values):
    deltas = [b - a for a, b in function(values)]
    min_span = min(deltas)
    max_span = max(deltas)
    total_span = sum(deltas)
    average_span = total_span / float(len(deltas))
    return min_span, max_span, total_span, average_span

def min_max_custom(function, values):
    deltas = [b - a for a, b in function(values)]
    min_span = max_span = total_span = length = 0
    for v in deltas:
       length += 1
       total_span += v
       if min_span > v:
           min_span = v
       elif max_span < v:
           max_span = v
    average_span = total_span / float(length)
    return min_span, max_span, total_span, average_span


def min_max_expensive_cmp(function, values):
    """ Inspired by http://code.activestate.com/recipes/577916-fast-minmax-function/ """
    deltas = [b - a for a, b in function(values)]
    min_span = max_span = 0
    for a, b in izip(deltas, islice(deltas, 1, None)):
        if b < a:
            a, b = b, a
        if min_span > a:
            min_span = a
        if max_span < b:
            max_span = b
    total_span = sum(deltas)
    average_span = total_span / float(len(deltas))
    return min_span, max_span, total_span, average_span

Результаты:

>>> %timeit min_max_simple(pairs2, t)
1 loops, best of 3: 319 ms per loop
>>> %timeit min_max_custom(pairs2, t)
1 loops, best of 3: 371 ms per loop
>>> %timeit min_max_expensive_cmp(pairs2, t)
1 loops, best of 3: 583 ms per loop

Таким образом предположение, что один проход по списку для поиска одновременно минимума и максимума будет быстрее четырех вызовов built-in соответствующих функций, не подтвердился. А так же неверным оказалось утверждение, что алгоритм поиска минимума и максимума для «дорогих» операций сравнений так же хорош и для дешёвых. Удовлетворив своё чувство перфекционизма, мы выбрали финальную реализацию для Reduce операции в Hadoop.

def pairs(data):
return zip(data, islice(data, 1, None))
def reducer(values):
deltas = [b - a for a, b in pairs(values)]
min_span = min(deltas)
max_span = max(deltas)
total_span = sum(deltas)
average_span = total_span / float(len(deltas))
return min_span, max_span, total_span, average_span