Processamento de Dados Massivos/Projeto e implementação de aplicações Big Data/Maximização de expectativas

Maximização de ExpectativasEditar

Nesta seção apresentamos o algoritmo conhecido como Maximização de Expectativas, em especial sua utilização em ambientes distribuídos que permitem o processamento de dados massivos, ou big data. Esta seção se divide em cinco seções a seguir: Descrição, Projeto, Desenvolvimento, Implementação e Avaliação. Finalmente, fazemos um breve estudo de caso do algoritmo, quando foi aplicado em uma competição, também de "big data".

Descrição da AplicaçãoEditar

Aqui iremos descrever o algoritmo conhecido como Maximização de Expectativas que, em conjunto com a maneira como iremos aplicá-lo ao cenário distribuído para o tratamento de dados massivos.

DefiniçãoEditar

O algoritmo Maximização de Expectativas, mais conhecido pelo seu nome inglês Expectation Maximization ou EM, pertece a uma classe de técnicas estatísticas para a estimativa de parâmetros para modelos estatísticos quando existem variáveis latentes (ou escondidas). Os parâmetros encontrados são estimativas por máxima verossimilhança ou maximum likelihood estimates (MLE).

Por modelo estatísticos estamos nos referindo à, basicamente, uma equação que descreve relações entre variáveis através de distribuições de probabilidade. Essas distribuições tornam as relações estocásticas ao invés de deterministas. Já variáveis latentes são variáveis que não são diretamente observadas nos dados. Dessa maneira, elas devem ser inferidas baseado nas variáveis que foram de fato observadas. A “mágica” do EM é que ele é capaz de lidar tanto com as variáveis latentes quanto com os parâmetros simultaneamente. Finalmente, a verossimilhança de um conjunto de parâmetros é, basicamente, a probabilidade desses parâmetros gerarem os resultados obtidos. O MLE é um método de inferência dos parâmetros com verossimilhança (ou probabilidade) máxima. O EM é uma generalização, portanto, do MLE em cenários que existem variáveis latentes. É usual se utilizar o logaritmo da verossimilhança (ou log-likelihood), porque a função logaritmica é uma função crescente e o logaritmo é uma operação monotônica, isto é, os parâmetros que maximizam a função original também maximizam a verossimilhança e por consequência o log-likelihood.

ContextoEditar

O algoritmo de Maximização de Expectativas foi explicado e batizado em um famoso artigo de 1977 por Arthur Dempster, Nan Laird, e Donald Rubin. quando disseram que o método já havia sido “proposto outras vezes por autores que os precederam”. Em particular, um tratamento bem detalhado do algoritmo EM para famílias exponenciais foi publicado por Rolf Sundberg em sua tese e diversos artigos, seguido de sua colaboração com Per Martin-Löf e Anders Martin-Löf. O artigo de Dempster-Laird-Rubin em 1977 generalizou o método e traçou uma análise de sua convergência para uma classe maior de problemas. Dessa maneira esse artigo estabeleceu o EM como uma importante ferramenta na análise estatística.

Entre os usos históricos mais famosos, está o de como Kevin Knight utilizou o EM para quebrar o código de uma sociedade secreta de mais de 250 anos, ao invés de depender num dicionário pré-definido, ele computou a tradução de todas as palavras russas (idioma do documento previamente misterioso) para o inglês, e para cada uma delas inventar uma chave para transformar todo documento para o inglês (Fonte).

Outro uso de importância do EM está descrito no artigo de Abhinandan Das, Mayur Datar, Ashutosh Garg e Shyam Rajaram, onde eles descrevem como o algoritmo foi utilizado, dentro da concepção do MapReduce, para a filtragem colaborativa. Outros usos importantes do algoritmo estão nas áreas de clusterização e classificação de dados. Como será demonstrado mais a frente, é possível identificar, facilmente, oportunidades de paralelização e distribuição de cargas de trabalho para ele.

AlgoritmoEditar

Como mencionado anteriormente, o algoritmo de Maximização de Expectativas é uma generalização da estimativa de máxima verossimilhança para o caso dos dados incompletos. Em particular, o EM tenta achar os parâmetros   que maximizam a probabilidade logaritmica   dos dados observados. Em linhas gerais, o problema de otimização abordado pelo EM é mais difícil que a otimização realizada pela estimativa de máxima verossimilhança. No caso onde os dados estão completos, a função objetivo   tem um único ótimo global, e que na maioria das vezes pode ser encontrado de forma fechada. Em contraste, no caso dos dados incompletos, a função   tem múltiplos máximos locais e nenhuma solução de forma fechada.

Para lidar com isso, o EM reduz a difícil tarefa de otimizar   em uma sequência de subproblemas de otimização mais simples, cujas funções objetivo tem um único máximo global que, dessa vez, podem ser encontrados de forma fechada. Esses subproblemas são escolhidos de uma maneira que garante que suas soluções correspondentes  ,  ,... e irão convergir para um ótimo local de  .

Mais especificamente, o algoritmo expectation maximization alterna entre duas fases:

  • Durante a fase E, o EM escolhe a função   que limita   inferiormente totalmente, e para a qual  ;
  • Durante a fase M, o algoritmo passa para o novo conjunto de parâmetros   que maximiza  .

A medida que o limiar inferior   se aproxima da função objetivo em  , temos que  , desta maneira, a função objetivo monotonicamente aumenta a cada iteração do EM.

Exemplo de FuncionamentoEditar

Um exemplo de um problema que pode ser resolvido através do Expectation Maximization, e que será utilizado no restante do trabalho é [1]: dadas duas moedas, cada uma delas com viés desconhecido, suponha que uma moeda aleatória é selecionada uniformemente a lançada N vezes. Esse experimento é repetido M vezes (para cada lançamento, uma das moedas é selecionada aleatoriamente). Qual o viés das moedas? Podemos identificar os conceitos explicitados anteriormente:

  • Parâmetros do modelo: Os viéses das moedas;
  • Variável latente: A seleção das moedas.

O que teria acontecido se a escolha das moedas fosse revelada? Teríamos um cenário de MLE tradicional. Os parâmetros (viéses) poderiam ser estimados através de: viés estimado da moeda A = número de coroa observadas no lançamento da moeda A / número de vezes que a moeda A foi lançada, e de maneira similar para a outra moeda. Mas como o EM funciona para o problema mencionado?

  1. Define valores iniciais para os viéses das moedas;
  2. Uma distribuiçnao de probabilidades sobre a escolha desconhecida das moedas é definida baseado nos parâmetros atuais;
  3. Novos pariametros são estimados utilizando-se o MLE baseados nas probabilidaes de distribuição encontradas no último passo;
  4. Retorne ao passo 2 (até a convergência).

A figura link apresenta um exemplo para esse cenário descrito.

RequisitosEditar

Como mencionado anteriormente, um dos primeiros requisitos é o de que exista variável latente no modelo a ser estudado, seja ela ausente na observação, ou seja ela propositalmente escondida. Além disso é necessário saber a função de distribuição de probabilidade da qual se deseja descobrir os parâmetros. Para alguns cenários essa função não é tão evidente e pode demandar certo esforço para ser corretamente definida.

Paralelizações ExistentesEditar

Abhinandan Das, Mayur Datar, Ashutosh Garg e Shyam Rajaram apresentaram uma das paralelizações mais famosas na litetura. No artigo intitulado “Google News Personalization: Scalable Online Collaborative FIltering” onde eles utilizam o Expectation Maximization para encontrar os parâmetros de máxima verossimilhança para o modelo em questão. Eles definem e isolam propriamente as fases E e M do algoritmo dentro do modelo e observam que a execução do algoritmo, para o volume de dados em questão é inviável (na época cerca de 80GiB de memória primária seria necessário). Dessa maneira eles separam usuários e itens (objetos em questão) e mapeam a etapa E para a etapa Map, e a M para a etapa Reduce. Na realidade, abordagem bastante similar é utilizada em nossa estudo de paralelização e distribuição da computação.

ProjetoEditar

Oportunidades de ParalelizaçãoEditar

Tomando o exemplo que será utilizado durante o trabalho, está apresentado na tabela abaixo o passo E do algoritmo de Maximização de Expectativas. É importante notar que cada uma das linhas representa um lançamento com uma moeda desconhecida. As duas tabelas seguintes apresentam a distribuição de probabilidade definida sobre ambas as moedas e as duas últimas representam o número de caras (H) e coroas (T) para a distribuição computada.

Lançamentos Probabilidade A Probabilidade B Expectativa A Expectativa B
H T T T H H T H T H 0.45 0.55 2.2 H, 2.2 T 2.8 H, 2.8 T
H H H H T H H H H H 0.80 0.20 7.2 H, 0.8 T 1.8 H, 0.2 T
H T H H H H H T H H 0.73 0.27 5.9 H, 1.5 T 2.1 H, 0.5 T
H T H T T T H H T T 0.35 0.65 1.4 H, 2.1 T 2.6 H, 3.9 T
T H H H T H H H T H 0.65 0.35 4.5 H, 8.6 T 2.5 H, 1.1 T

É importante notar que cada linha encapsula todas as informações necessárias (juntamente com os parâmetros estimados na iteração anterior) e pode ser calculada de maneira independente. Essa é a oportunidade para a paralelização dessa computação, extremamente adequada para a etapa Map, no modelo de programação MapReduce.

Após cada uma das probabilidades terem sido calculadas é a hora de estimar os novos parâmetros (os viéses) para cada uma das moedas. Isso se dá com a soma das duas últimas colunas e a aplicação do MLE. Novamente é fácil notar que, com o resultado da fase Map, basta aplicar a etapa Reduce do modelo MapReduce para obtermos os novos parâmetros e realizar outra etapa de MapReduce.

Padrões de Acesso aos DadosEditar

Não existem necessidades ou observações especiais de acesso aos dados além do já citado anteriormente, de que cada nó irá guardar uma fração dos dados que é de sua competência. Um único detalhe especial é de que o valor para os parâmetros da iteração anterior deve estar disponível para todos os nós.

Padrões de ComunicaçãoEditar

Aqui novamente não há detalhes especiais a serem mencionados. Cada bloco de arquivo, que foi distribuído para cada nó contém todas as informações necessárias para o processamento que ele deve realizar. Dessa maneira, não existe a possibilidade de dead-locks ou de starvation, ou qualquer outro dano causado pela ociosidade em função de comunicação não bem realizada. Os únicos dados recebidos a cada iteração do processamento distribuído são os parâmetros, e os enviados, os números esperados de caras e coroas para a função de distribuição definida.

Obviamente, funções de probabilidade calculadas no passo E devem ser repassadas no passo Reduce, mas esse detalhe é implícito ao modelo de programação MapReduce.

Linha do Tempo IntegradaEditar

A linha do tempo abaixo ilustra a linha do tempo do processamento do algoritmo de Maximização de Expectativas, como discutido anteriormente. Cada nó irá receber um ou mais linhas do dado bruto, lembrando que cada linha corresponde à um lançamento das moedas. Isso é possível graças a independência do cálculo dos valores para a distribuição de probabilidade, após a obtenção do primeiro valor para os parâmetros (feito nesse trabalho de maneira aleatória). Essa tarefa é coordenada pela etapa Map.

Em seguida, a etapa Reduce, se encarrega de agregar os valores computados para os valores esperados para a cada distribuição avaliada, para por fim, estipular novos parâmetros e recomeçar o ciclo MapReduce até que haja conversão para os valores dos parâmetros.

 
Linha do tempo para o Algoritmo EM dentro do modelo MapReduce

DesenvolvimentoEditar

Estratégias de ParalelizaçãoEditar

A solução encontrada para distribuir o algoritmo entre vários nós é bastante direta: calcular a verossimilhança de cada conjunto de lançamentos em paralelo através do map e para cada moeda calcular a nova distribuição de probabilidades a partir da verossimilhança encontrada para moeda.

Inicia-se com um conjunto de P(x)=cara para cada moeda, de forma randômica
Enquanto probabilidade de cada moeda continua mudando:
	mapper(linha,conteudo):
		[cara,coroa] = conteudo.split()
		//calcula probabilidade de que cada lancamento pertenca a uma das moedas
		for moeda in moedas:
			emits(moeda, [num_coroas, num_caras])

	reduce(moeda, jogadas[num_coroas, num_caras]):
		numero_caras = 0
		numero_coroas = 0
		for jogada in jogadas:
			numero_coroas += jogada[0]
			numero_caras += jogada[1]
		//emite novo likelihood da moeda
		probabilidade_cara = numero_caras/(numero_caras+numero_coroas)

		//emite a probabilidade de que a moeda dê cara
		emits(moeda, probabilidade_cara)

Estratégias de ArmazenamentoEditar

Contamos com somente um arquivo de entrada para alimentar o algorimto de maximização de expectativas. Neste arquivo - com tamanho que pode chegar 1gb - cada linha representa um conjunto de lançamentos feito com uma moeda desconhecida.

Inicialmente, transferimos o arquivo para o HFS (Hadoop File System) escolhendo o nível de redundância igual 2. Isso significa que nosso arquivo é separado em CHUNKS de dados e cada chunk é copiado em dois diferentes nós. Observe que cada chunk é completo, isto é, contém linhas completas do arquivo original. Isto é importante para mantermos o cálculo em paralelo e independente das verossimilhanças dos conjuntos de lançamentos e as prováveis probabilidades de cada moeda.

 
Armazenamento de arquivo de texto com redudância no HFS

Estratégias de ComunicaçãoEditar

A execução de programas em ambientes distribuídos pode trazer benefícios se o tempo para iniciação dos programas e troca de dados não forem overhead maior do que o ganho com a paralelização da tarefa.

A estratégia MapReduce é reduzir ao máximo a troca de dados entre os nós que trabalham para realizar uma tarefa. Neste paradigma, o objetivo é levar o processamento até onde os dados estão e não o inverso. Portanto, inicialmente, quando se transfere os arquivos para o HFS eles são distribuídos em chunks pelos nós disponíveis no cluster. O processamento de Map ocorre em cada um dos nós que já possuem parte dos dados. Dados são transferidos somente no caso de queda de máquina e/ou grave desbalanceamento.

Por fim, terminadas as tarefas de Map, os dados resultantes são transferidos para os worker que estão rodando as tarefas de Reduce. Seguindo a estratégia MapReduce, o recomendável é que estes dados não sejam complexos e pequenos o suficiente para não causar overhead na rede. No caso da nossa tarefa, esses dados são somente as novas contagens geradas pela verossimilhança entre o conjunto de lançamentos e a probabilidade P(cara) de cada moeda que podem ser expressas utilizando o tipo Double.

ImplementaçãoEditar

Plataformas e FerramentasEditar

Utilizamos o Hadoop File System[2] versão 1.03 para armazenamento dos nosso dados de forma distribuída e programas nossa estratégia de MapReduce utilizando a API do Apache Hadoop.

O Hadoop File System é um sistema distribuído, escalável, portável e open-source escrito em Java que suporta a execução de aplicações que exigem uma quantidade grande dados de forma distribuída. Ele foi inspirado nos artigos escritos pela Google descrevendo o Google File System [3] que também implementa o paradigma de MapReduce.

A configuração do cluster onde trabalhamos foi feita a partir do OpenStack [4], software para fácil gerenciamento de infraestruturas virtualizadas. Nosso cluster continha quatro máquinas virtuais diferentes.

Integração de Plataformas e FerramentasEditar

Para integração das plataformas, substituímos o sistema Java openSDK das máquinas virtuais ubuntu pelo sistema da Sun - para o qual o Hadoop é melhor adaptado. Utilizamos um namenode com réplica e quatro datanodes com nível de redundância igual a 2.

Utilizamos a API Java do Hadoop para criarmos nossas tarefas de MapReduce, assim como o nosso driver para iniciação dos Jobs.

Detalhes de ImplementaçãoEditar

Note que a dificuldade que temos aqui consiste em passar os novos valores de probabilidade para os maps a cada iteração. Note que precisamos passar somente P(cara), uma vez que P(coroa) pode ser dado por 1-P(cara). Para isso, utilizamos do JobConf. O JobConf é acessível a todo processo de map e reduce e permite que lhe sejam atribuídos parâmetros com valores de tipos primitivos e recuperá-los durante a execução através do método get().

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, DoubleArrayWritable>{
		
  private double bias_1;
  private double bias_2;
		
  //recupera o valor dos das probabilidades já calculadas para cada moeda
  public void configure(JobConf job){
    probabilidade_1 = Double.parseDouble(job.get("probabilidade_1"));
    probabilidade_2 = Double.parseDouble(job.get("probabilidade_2"));
  }
  public void map(LongWritable key, Text linha, OutputCollector<IntWritable, DoubleArrayWritable> output){
    String line = value.toString();
			
    double[] caraProbabilidades = new double[2];
    caraProbabilidades[0] = probabilidade_1;
    caraProbabilidades[1] = probabilidade_2;

    //retorna dois vetores que contabilizam os prováveis lançamentos que geraram cara e coroa para cada moeda		
    DoubleArrayWritable[][] verossimilhancas = gera_verossimilhancas(line, caraProbabilidades)	
    
  
    try{
      output.collect(new IntWritable(1), verossimilhancas[0]);
      output.collect(new IntWritable(2), verrossimilhancas[1]);
    }catch(Exception e){
      System.out.println("erro");
    }
  }
}

A tarefa de reduce se encarrega de receber as contagens de arremessos de acordo com o número da moeda e gerar a nova probabilidade de P(cara) para cada moeda.

public void reduce(IntWritable moeda, Iterator<DoubleArrayWritable> lancamentos, OutputCollector<IntWritable, DoubleWritable>output, Reporter reporter){
  double numero_caras = 0;
  double numero_coroas = 0;
  while(jogadas.hasNext()){
    DoubleWritable[] lancamento = (DoubleWritable[]) lancamentos.next().get();
    numero_caras += lancamento[HEADS].get();
    numero_coroas += lancamento[TAILS].get();
  }
  try{
    Double probabilidade_cara = numero_caras/(numero_caras+numero_coroas);
    output.collect(moeda, new DoubleWritable(probabilidade_cara));
  }catch(Exception e){
    System.out.println("erro no reduce");
  }
}

O nosso driver é por onde inicia-se a execução do programa. Uma vez iniciada, a cada iteração testa-se a convergência, isto é, se a diferença entre os valore de P(cara) de cada uma das novas moedas entre iterações é menor que certo limiar.

public static void main(String[] args) throws Exception{
   //recupera probabilidades a partir de arquivo com o mesmo nome do argumento
  double probabilidade_1 = recupera_probabilidade("probabilidade_1");
  double probabilidade_2 = recupera_probabilidade("probabilidade_2");
  while(true){
    JobConf conf = new JobConf(EMD.class);
    conf.setJobName("EM");
    
    //seta probabilidades no JobConf
    job.set("probabilidade_1", Double.toString(probabilidade_1));
    job.set("probabilidade_2", Double.toString(probabilidade_2));
			
    //configuracoes de rotina 
    conf.setOutputKeyClass(IntWritable.class);
    conf.setOutputValueClass(DoubleWritable.class);
    conf.setMapOutputKeyClass(IntWritable.class);
    conf.setMapOutputValueClass(DoubleArrayWritable.class);
			
    conf.setMapperClass(Map.class);
			
    conf.setReducerClass(Reduce.class);
			
    conf.setInputFormat(TextInputFormat.class);
    conf.setOutputFormat(TextOutputFormat.class);
			
    FileInputFormat.setInputPaths(conf, new Path(args[0]));
    FileInputFormat.setInputPaths(conf, new Path(args[1]));
			
    JobClient.runJob(conf);
			
    double prob_1_old = probabilidade_1;
    double prob_2_old = probabilidade_2;
    probabilidade_1 = recupera_probabilidade("probabilidade_1");
    probabilidade_2 = recupera_probabilidade("probabilidade_2");

    //testa convergencia
    if(Math.abs(probabilidade_1 - prob_1_old)>0.000001 && Math.abs(probabilidade_2 - prob2_old)>0.000001)
       break
}
}

AvaliaçãoEditar

Carga de TrabalhoEditar

A carga de trabalho foi gerada artificialmente. A divisão do trabalho é feita com base no conjunto de lançamentos e, portanto, um grande número de conjuntos de lançamentos foram gerados aleatoriamente para duas moedas, uma com P(cara) = 0.7 e outra com P(cara) = 0.4. O Objetivo é conseguir agrupar os conjuntos de lançamentso nos seus próprios cluster, numa mistura gaussiana, descobridno os parâmetros P(cara) de cada moeda.

Foram gerados 100,000,000,000 que foram escritos em um arquivo de texto. Cada linha representa um lançamento contendo dois inteiros: o número de caras e o número de coroas. A representação de todos os lançamentos utilizando dois inteiros de 4 bytes para cada lançamento utiliza aproximadamente 0.75Gb.

Avaliação ExperimentalEditar

A avaliação foi feita inicialmente sobre um single-node cluster rodando localmente e depois se expande para o cluster com quatro máquinas, sendo um namenode que também funciona como datanode. O arquivo contendo a lista de lançamentos é carregada inicialmente de forma a já ser particionada entre os nós antes da execução das tarefas. Mede-se o tempo de execução até a convergência para uma versão serial da aplicação e compara-se com os resultados alcançados com o algoritmo distribuído.

Análise de ResultadosEditar

[andamento]

Para um nó o processamento é local.

 
Tempo gasto para processar 100,000,000 pontos no EM

Note que o speedup é quase de 2 vezes aumentando para dois nós, mas, no entanto, ele não é tão grande aumentando-se mais nós. Isso se deve ao fato o tamanho da ram da máquina (2Gb) ser o fato limitante para processamento dos 100,000,000 pontos. Dividindo-se o workload em duas máquinas, divide-se também a quantidade de memória necessária para o processamento. No entanto, a memória deixa, então , de ser o principal gargalo.

Análise CríticaEditar

A vantagem do uso do Hadoop foi em dividir os dados desde o início utilizando o sistema distribuído e seguir a filosofia de "mandar o processamento para onde temos dados". No entanto, se tívessemos que distribuir os dados enquanto exetávamos o algoritmo de EM, provavelmente não teríamos obtidos resultados tão bons, uma vez que a rede seria um gargalo para transmissão de dados.

Referências

  1. [1], What is the expectation maximization algorithm? by Chuong B Do and Serafim Batzoglou
  2. [2],Hadoop File System (HFS)
  3. [3] The Google File System, by Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung
  4. [4], OpenStack