Processamento de Dados Massivos/Projeto e implementação de aplicações Big Data/Maximização de expectativas: diferenças entre revisões

[edição não verificada][edição não verificada]
Conteúdo apagado Conteúdo adicionado
Linha 106:
===Estratégias de Paralelização===
 
A solução encontrada para distribuir o algoritmo entre vários nós é bastante direta: calcular a verossimilhança de cada conjunto de lançamentos em paralelo através do map e para cada moeda calcular a nova distribuição de probabilidades a partir da verossimilhança encontrada para moeda.
 
 
Note que a dificuldade que temos aqui consiste em passar os novos valores de probabilidade para os maps a cada iteração. Note que precisamos passar somente P(cara), uma vez que P(coroa) pode ser dado por 1-P(cara). Para isso, utilizamos do JobConf. O JobConf é acessível a todo processo de map e reduce e permite que lhe sejam atribuídos parâmetros com valores de tipos primitivos e recuperá-los durante a execução através do método get().
 
<syntaxhighlight lang="Java">
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, DoubleArrayWritable>{
private double bias_1;
private double bias_2;
//recupera o valor dos das probabilidades já calculadas para cada moeda
public void configure(JobConf job){
probabilidade_1 = Double.parseDouble(job.get("probabilidade_1"));
probabilidade_2 = Double.parseDouble(job.get("probabilidade_2"));
}
public void map(LongWritable key, Text linha, OutputCollector<IntWritable, DoubleArrayWritable> output){
String line = value.toString();
double[] caraProbabilidades = new double[2];
caraProbabilidades[0] = probabilidade_1;
caraProbabilidades[1] = probabilidade_2;
 
//retorna dois vetores que contabilizam os prováveis lançamentos que geraram cara e coroa para cada moeda
DoubleArrayWritable[][] verossimilhancas = gera_verossimilhancas(line, caraProbabilidades)
try{
output.collect(new IntWritable(1), verossimilhancas[0]);
output.collect(new IntWritable(2), verrossimilhancas[1]);
}catch(Exception e){
System.out.println("erro");
}
}
}
</syntaxhighlight>
 
 
 
<syntaxhighlight lang="Java">
 
public void reduce(IntWritable moeda, Iterator<DoubleArrayWritable> lancamentos, OutputCollector<IntWritable, DoubleWritable>output, Reporter reporter){
double numero_caras = 0;
double numero_coroas = 0;
while(jogadas.hasNext()){
DoubleWritable[] lancamento = (DoubleWritable[]) lancamentos.next().get();
numero_caras += lancamento[HEADS].get();
numero_coroas += lancamento[TAILS].get();
}
try{
Double probabilidade_cara = numero_caras/(numero_caras+numero_coroas);
output.collect(moeda, new DoubleWritable(probabilidade_cara));
}catch(Exception e){
System.out.println("erro no reduce");
}
}
 
</syntaxhighlight>
 
O nosso driver é por onde inicia-se a execução do programa. Uma vez iniciada, a cada iteração testa-se a convergência, isto é, se a diferença entre os valore de P(cara) de cada uma das novas moedas entre iterações é menor que certo limiar.
 
<syntaxhighlight lang="Java">
public static void main(String[] args) throws Exception{
//recupera probabilidades a partir de arquivo com o mesmo nome do argumento
double probabilidade_1 = recupera_probabilidade("probabilidade_1");
double probabilidade_2 = recupera_probabilidade("probabilidade_2");
while(true){
JobConf conf = new JobConf(EMD.class);
conf.setJobName("EM");
//seta probabilidades no JobConf
job.set("probabilidade_1", Double.toString(probabilidade_1));
job.set("probabilidade_2", Double.toString(probabilidade_2));
//configuracoes de rotina
conf.setOutputKeyClass(IntWritable.class);
conf.setOutputValueClass(DoubleWritable.class);
conf.setMapOutputKeyClass(IntWritable.class);
conf.setMapOutputValueClass(DoubleArrayWritable.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileInputFormat.setInputPaths(conf, new Path(args[1]));
JobClient.runJob(conf);
double prob_1_old = probabilidade_1;
double prob_2_old = probabilidade_2;
probabilidade_1 = recupera_probabilidade("probabilidade_1");
probabilidade_2 = recupera_probabilidade("probabilidade_2");
 
//testa convergencia
if(Math.abs(probabilidade_1 - prob_1_old)>0.000001 && Math.abs(probabilidade_2 - prob2_old)>0.000001)
break
}
}
</syntaxhighlight>
 
===Estratégias de Armazenamento===