Processamento Paralelo e Iterativo para Recomendações de Aprendizado de Máquina com Spark






O texto a seguir é uma tradução livre de um tutorial da MapR que achei muito interessante.
Espero que também ache :)


Sistemas de recomendação ajudam a estreitar suas escolhas para aquelas que melhor atendem às suas necessidades, estando entre os mais populares aplicativos de processamento de Big Data. Neste post vamos discutir a construção de um modelo de recomendação de classificação de filmes, semelhante a estes dois artigos: Um Olhar Interno aos Componentes de um Mecanismo de Recomendação e Sistema de Recomendação com Mahout e Elasticsearch, mas desta vez usando um algoritmo iterativo e processamento paralelo com Apache Spark MLlib.

Neste post vamos abordar:

  1. Uma diferença fundamental entre Spark e MapReduce, que faz o Spark muito mais rápido para algoritmos iterativos.
  2. Filtragem colaborativa para recomendações com Spark.
  3. Carregando e explorando amostras de dados definidos com Spark.
  4. Usando o algoritmo Spark MLib (Alternância de Quadrados Mínimos) para fazer recomendações de filmes.
  5. Testando os resultados das recomendações.
Uma diferença fundamental entre Spark e MapReduce



O Spark é especialmente útil para o processamento paralelo de dados distribuídos com algoritmos iterativos. Conforme discutido no Guia de 5 minutos para compreender o significado do Apache
Spark, o Spark tenta manter os dados na memória, ao passo que o MapReduce envolve mais leitura e escrita de disco. Como mostrado na imagem abaixo, para cada MapReduce Job, os dados são lidos de um arquivo HDFS para um mapeador, escrito para e de um SequenceFile e depois gravados em um arquivo de um Redutor. Quando uma cadeia de múltiplas tarefas é necessária, o Spark pode executar muito mais rapidamente, mantendo os dados na memória. Para o registro, há benefícios em gravar no disco, sendo este mais tolerante a falhas do que a memória.


Leitura de Partições de Dados RDDS através RAM ao invés do Disco


Os dados distribuídos resilientes do Spark, RDDS, são um conjunto de elementos particionados entre os nós de um cluster que podem ser operados em paralelo. RDDS pode ser criado a partir de arquivos HDFS e pode ser armazenado em cache, permitindo reutilização em operações paralelas.


O diagrama abaixo mostra uma aplicação Spark rodando em um exemplo cluster do Hadoop. A tarefa aplica-se a sua unidade de trabalho para os elementos RDD em sua partição e gera uma nova partição, uma vez que algoritmos iterativos aplicam operações repetidamente aos dados, se beneficiando do RDDS em memória, armazenando em cache através de iterações.


Filtragem Colaborativa com Spark

Algoritmos de filtragem colaborativa recomendam itens (esta é a parte de filtragem) com base em informações de preferência de muitos usuários (esta é a parte em colaboração). A abordagem de filtragem colaborativa baseia-se na semelhança; a ideia básica é que as pessoas que gostavam de itens semelhantes no passado, vão gostar de itens semelhantes no futuro. No exemplo abaixo, Ted gosta dos filmes A, B, e C. Carol gosta dos filmes B e C. Bob gosta do filme B. Para recomendar um filme a Bob, calculamos que os usuários que gostavam de B também gostaram de C, então C é uma possível recomendação para Bob. Naturalmente, este é um pequeno exemplo. Em situações reais, teríamos muito mais dados para trabalhar.

O Spark MLlib implementa um algoritmo de filtragem colaborativa chamado Alternância de Quadrados Mínimos (ALS, sigla em inglês).

O ALS aproxima a matriz escassa de classificação de itens de usuário de dimensão K como o produto de duas matrizes densas, User e Item, matrizes fatoriais de tamanho U x K e I x K (veja a imagem abaixo). As matrizes de fator (ou fatoriais) também são chamadas de modelos de recursos latentes. Elas representam características escondidas que o algoritmo tenta descobrir. Uma matriz tenta descrever as características latentes ou ocultas de cada usuário e outra tenta descrever as propriedades latentes de cada filme.


O ALS é um algoritmo iterativo. Em cada iteração, o algoritmo alternativamente corrige uma matriz de fatores e resolve para o outro e este processo continua até convergir. Esta alternância entre os quais a matriz é otimizada, é a origem do nome "alternância".

Software

  • Este tutorial será executado no MapR v5 Sandbox, que inclui o Spark.
  • Você pode baixar o código e dados para executar esses exemplos aqui:
  • Código: https://github.com/caroljmcdonald/
  • Dados: http://files.grouplens.org/datasets/movielens/ml-1m.zip
  • O exemplo neste post pode ser executado no spark-shell; após rodar com o comando spark-shell, basta copiar e colar o código das caixas de código abaixo.
  • Você também pode executar o código como um aplicativo independente, conforme descrito no tutorial sobre Introdução ao Spark no MapR Sandbox.

Fluxo de Trabalho Típico de Aprendizagem de Máquina

Um fluxo de trabalho típico de aprendizagem de máquina é mostrado abaixo.

Neste tutorial vamos realizar os seguintes passos:

  1. Carregar os dados de amostra.
  2. Analisar os dados em formato de entrada para o algoritmo do ALS.
  3. Dividir os dados em duas partes, uma para a construção do modelo e uma para testar o modelo.
  4. Executar o algoritmo ALS para construir/treinar um modelo de matriz de produto de usuário.
  5. Fazer previsões com os dados de treinamento e observar os resultados.
  6. Testar o modelo com os dados de teste.

O Conjunto de Amostras

A tabela abaixo mostra os campos de Avaliação de dados com alguns dados de exemplo:

A tabela abaixo mostra os campos de dados de cinema com alguns dados de exemplo:

Primeiro vamos explorar os dados usando Spark Data frames com perguntas como:

  • Conte os ratings máximo e mínimo juntamente com o número de usuários que avaliaram um filme.
  • Exibir o título para filmes com classificações > 4


Carregando dados no Spark Dataframe

Conecte-se no MapR Sandbox, conforme explicado em Introdução ao Spark no MapR Sandbox, usando ID do usuário “user01” ID e senha “mapr”. Copie os arquivos de dados de amostras para seu diretório sandbox home /user/user01 usando scp. Inicie o spark shell com

$ spark-shell


Primeiro vamos importar alguns pacotes e instanciar um SqlContext, que é o ponto de entrada para trabalhar com dados estruturados (linhas e colunas) em Spark e permite a criação de objetos DataFrame.


(Nas caixas de código, os comentários estão em verde e a saída em azul)

// SQLContext entry point for working with structured data
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Import Spark SQL data types
import org.apache.spark.sql._
// Import mllib recommendation data types
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}

A seguir nós usamos classes de caso Scala para definir o filme e esquemas de usuários correspondentes aos arquivos movies.dat e users.dat.

// input format MovieID::Title::Genres
case class Movie(movieId: Int, title: String, genres: Seq[String])

// input format is UserID::Gender::Age::Occupation::Zip-code
case class User(userId: Int, gender: String, age: Int, occupation: Int, zip: String)

As funções abaixo analisam uma linha a partir dos arquivos movie.dat, user.dat e rating.dat nas classes de cinema e de usuário correspondentes.

// function to parse input into Movie class
def parseMovie(str: String): Movie = {
val fields = str.split("::")
assert(fields.size == 3)
Movie(fields(0).toInt, fields(1))
}
// function to parse input into User class
def parseUser(str: String): User = {
val fields = str.split("::")
assert(fields.size == 5)
User(fields(0).toInt, fields(1).toString, fields(2).toInt,fields(3).toInt, fields(4).toString)

A seguir nós carregamos os dados do arquivo ratings.dat em um conjunto de dados distribuído Resiliente (RDD). O RDDS pode ter transformações e ações. A primeira ação retorna o primeiro elemento na RDD, que é a String "1 :: 1193 :: 5 :: 978300760"

// load the data into a RDD
val ratingText = sc.textFile("/user/user01/moviemed/ratings.dat")
// MapPartitionsRDD[1] at textFile

// Return the first element in this RDD
ratingText.first()
// String = 1::1193::5::978300760

Nós usamos a classe org.apache.spark.mllib.recommendation.Rating para analisar o arquivo ratings.dat. Mais tarde, vamos usar a classe Rating como entrada para o método de execução ALS.

Usamos a transformação map no ratingText, que será aplicável a função parseRating para cada elemento em ratingText, retornando um novo RDD de objetos de rating. Nós armazenamos em cache os dados de rating, já que vamos usar esses dados para construir o modelo de matriz. Então nós começamos a contagem para o número de classificações, filmes e usuários.


// function to parse input UserID::MovieID::Rating
// Into org.apache.spark.mllib.recommendation.Rating class
def parseRating(str: String): Rating= {
val fields = str.split("::")
Rating(fields(0).toInt, fields(1).toInt, fields(2).toDouble)
}

// create an RDD of Ratings objects
val ratingsRDD = ratingText.map(parseRating).cache()
//ratingsRDD: org.apache.spark.mllib.recommendation.Rating] = MapPartitionsRDD

// count number of total ratings
val numRatings = ratingsRDD.count()
//numRatings: Long = 1000209

// count number of movies rated
val numMovies = ratingsRDD.map(_.product).distinct().count()
//numMovies: Long = 3706

// count number of users who rated a movie
val numUsers = ratingsRDD.map(_.user).distinct().count()
//numUsers: Long = 6040


Explorar e consultar Movie Lens Data com DataFrames Spark
Spark SQL fornece uma abstração de programação chamada DataFrames. Um DataFrame é uma coleção distribuída de dados organizados em colunas nomeadas. Spark suporta converter automaticamente um RDD contendo case classes para um DataFrame com o método toDF, e o case class define o esquema da tabela.

Abaixo, nós carregamos os dados dos usuários e arquivos de dados de filmes em uma RDD, usando a transformação map com as funções de análise. Em seguida, chamamos a função toDF() que retorna um DataFrame para o RDD. Na sequência, registramos os DataFrames como tabelas temporárias para que possamos usá-las nas instruções SQL.


// load the data into DataFrames
val usersDF = sc.textFile("/user/user01/moviemed/users.dat").map(parseUser).toDF()
val moviesDF = sc.textFile("/user/user01/moviemed/movies.dat").map(parseMovie).toDF()

// create a DataFrame from the ratingsRDD
val ratingsDF = ratingsRDD.toDF()

// register the DataFrames as a temp table
ratingsDF.registerTempTable("ratings")
moviesDF.registerTempTable("movies")
usersDF.registerTempTable("users")


DataFrame printSchema() que imprime o esquema para o console em formato de árvore.


// Return the schema of this DataFrame
usersDF.printSchema()
root
|-- userId: integer (nullable = false)
|-- gender: string (nullable = true)
|-- age: integer (nullable = false)
|-- occupation: integer (nullable = false)
|-- zip: string (nullable = true)

moviesDF.printSchema()
root
|-- movieId: integer (nullable = false)
|-- title: string (nullable = true)

ratingsDF.printSchema()
root
|-- user: integer (nullable = false)
|-- product: integer (nullable = false)
|-- rating: double (nullable = false) |-- zip: string (nullable = true)
Here are some example queries using Spark SQL with DataFrames on the Movie Lens data. The first query gets the maximum and minimum ratings along with the count of users who have rated a movie.
// Get the max, min ratings along with the count of users who have rated a movie.
val results =sqlContext.sql("select movies.title, movierates.maxr, movierates.minr, movierates.cntu from(SELECT ratings.product, max(ratings.rating) as maxr, min(ratings.rating) as minr,count(distinct user) as cntu FROM ratings group by ratings.product ) movierates join movies on movierates.product=movies.movieId order by movierates.cntu desc ")

// DataFrame show() displays the top 20 rows in tabular form
results.show()
title maxr minr cntu
American Beauty (... 5.0 1.0 3428
Star Wars: Episod... 5.0 1.0 2991
Star Wars: Episod... 5.0 1.0 2990
Star Wars: Episod... 5.0 1.0 2883
Jurassic Park (1993) 5.0 1.0 2672
Saving Private Ry... 5.0 1.0 2653


A consulta a seguir encontra os usuários que classificaram a maioria dos filmes, encontrando filmes que o usuário mais ativo nominal classificou como maior que 4. Vamos obter recomendações para este usuário mais tarde.

// Show the top 10 most-active users and how many times they rated a movie
val mostActiveUsersSchemaRDD = sqlContext.sql("SELECT ratings.user, count(*) as ct from ratings group by ratings.user order by ct desc limit 10")

println(mostActiveUsersSchemaRDD.collect().mkString("\n"))
[4169,2314]
[1680,1850]
[4277,1743]
. . .
// Find the movies that user 4169 rated higher than 4
val results =sqlContext.sql("SELECT ratings.user, ratings.product, ratings.rating, movies.title FROM ratings JOIN movies ON movies.movieId=ratings.product where ratings.user=4169 and ratings.rating > 4")

results.show
user product rating title
4169 1231 5.0 Right Stuff, The ...
4169 232 5.0 Eat Drink Man Wom...
4169 3632 5.0 Monsieur Verdoux ...
4169 2434 5.0 Down in the Delta...
4169 1834 5.0 Spanish Prisoner,... …
Usando ALS para Construir um MatrixFactorizationModel com os dados de Classificação de Filmes

Agora vamos usar o algoritmo MLlib ALS para aprender os fatores latentes que podem ser usados para prever entradas ausentes na matriz de associação usuário-item. Em primeiro lugar, separamos as classificações de dados em dados de treino (80%) e os dados de teste (20%). Nós vamos obter recomendações para os dados de treinamento e então vamos avaliar as previsões com os dados de teste. Este processo de tomar um subconjunto dos dados para construir o modelo e, em seguida, a verificação do modelo com os dados restantes são conhecidos como validação cruzada, e o objetivo é o de estimar a precisão com que um modelo preditivo vai realizar na prática. Para melhorar o modelo, este processo muitas vezes é feito várias vezes com diferentes subgrupos, mas nós só iremos fazê-lo uma vez.


Executamos ALS na entrada trainingRDD de Avaliação (usuário, produto, classificação) com os parâmetros rank e interation:

  • rank é o número de factores latentes no modelo.
  • interation é o número de iterações para executar.

O método run ALS (trainingRDD) vai construir e retornar um MatrixFactorizationModel, que pode ser usado para fazer previsões de produtos para usuários.

// Randomly split ratings RDD into training data RDD (80%) and test data RDD (20%)
val splits = ratingsRDD.randomSplit(Array(0.8, 0.2), 0L)

val trainingRatingsRDD = splits(0).cache()
val testRatingsRDD = splits(1).cache()

val numTraining = trainingRatingsRDD.count()
val numTest = testRatingsRDD.count()
println(s"Training: $numTraining, test: $numTest.")
//Training: 800702, test: 199507.

// build a ALS user product matrix model with rank=20, iterations=10
val model = (new ALS().setRank(20).setIterations(10).run(trainingRatingsRDD))

Fazer Previsões com a MatrixFactorizationModel

Agora podemos usar o MatrixFactorizationModel para fazer previsões. Primeiro vamos obter previsões de filmes para o usuário mais ativo, 4169, com o método recommendProducts(), que toma como entrada o ID do usuário e o número de produtos para recomendar. Em seguida, imprimimos os títulos de filmes recomendados.

// Get the top 4 movie predictions for user 4169
val topRecsForUser = model.recommendProducts(4169, 5)
// get movie titles to show with recommendations
val movieTitles=moviesDF.map(array => (array(0), array(1))).collectAsMap()
// print out top recommendations for user 4169 with titles
topRecsForUser.map(rating => (movieTitles(rating.product), rating.rating)).foreach(println)
(Other Side of Sunday) (1996),5.481923568209796)
(Shall We Dance? (1937),5.435728723311838)
(42 Up (1998),5.3596886655841995)
(American Dream (1990),5.291663089739282)

Avaliando o Modelo

Em seguida, vamos comparar as previsões do modelo com classificações reais no testRatingsRDD. Em primeiro lugar temos os pares de produtos do usuário do testRatingsRDD para passar para o MatrixFactorizationModel método predict (user: Int, product: Int), que irá retornar previsões de Avaliação (usuário, produto, classificação).

// get user product pair from testRatings
val testUserProductRDD = testRatingsRDD.map {
case Rating(user, product, rating) => (user, product)
}
// get predicted ratings to compare to test ratings
val predictionsForTestRDD = model.predict(testUserProductRDD)

predictionsForTestRDD.take(10).mkString("\n")
Rating(5874,1084,4.096802264684769)
Rating(6002,1084,4.884270180173981)

Agora vamos comparar as previsões de teste com as avaliações de teste reais. Primeiro vamos colocar as previsões e os RDDS teste nessa chave, value pair format for joining: ((user, product), rating). Em seguida, imprimir o (usuário, produto), (test rating, predict rating) para comparação.

/ prepare predictions for comparison
val predictionsKeyedByUserProductRDD = predictionsForTestRDD.map{
case Rating(user, product, rating) => ((user, product), rating)
}
// prepare test for comparison
val testKeyedByUserProductRDD = testRatingsRDD.map{
case Rating(user, product, rating) => ((user, product), rating)
}

//Join the test with predictions
val testAndPredictionsJoinedRDD = testKeyedByUserProductRDD.join(predictionsKeyedByUserProductRDD)

// print the (user, product),( test rating, predicted rating)
testAndPredictionsJoinedRDD.take(3).mkString("\n")
((455,1254),(4.0,4.48399986469759))
((2119,1101),(4.0,3.83955683816239))

((1308,1042),(4.0,2.616444598335322))

O exemplo abaixo encontra falsos positivos, encontrando classificações previstas que foram> = 4, quando a classificação de teste real era <= 1. Havia 557 falsos positivos fora de 199,507 avaliações de teste.

val falsePositives =(testAndPredictionsJoinedRDD.filter{
case ((user, product), (ratingT, ratingP)) => (ratingT <= 1 && ratingP >=4)
})
falsePositives.take(2)
Array[((Int, Int), (Double, Double))] =
((3842,2858),(1.0,4.106488210964762)),
((6031,3194),(1.0,4.790778049100913))

falsePositives.count
res23: Long = 557

Em seguida, vamos avaliar o modelo usando o erro médio absoluto (MAE). O MAE é a diferença absoluta entre os objetivos previstos e reais.

//Evaluate the model using Mean Absolute Error (MAE) between test and predictions
val meanAbsoluteError = testAndPredictionsJoinedRDD.map {
case ((user, product), (testRating, predRating)) =>
val err = (testRating - predRating)
Math.abs(err)
}.mean()
meanAbsoluteError: Double = 0.7244940545944053

Conclusão

Isto conclui o tutorial sobre Processamento Paralelo e Iterativo para Recomendações de Aprendizado de Máquina com o Spark.

Para saber mais

  1. Baixe o ebook sobre soluções para Big Data que escrevi;
  2. Se inscreva na lista que criei para discutir o tema no Google Groups;
  3. Confira minha palestra virtual apresentando os conceitos básicos da tecnologia, depois venha trocar idéias!

Christian Guerreiro

Professor por vocação, blogueiro e servidor público por opção, amante da tecnologia e viciado em informação.


Ensino a distância em Tecnologia da Informação: Virtualização com VMware, Big Data com Hadoop, Certificação ITIL 2011 Foundations e muito mais.


Suporte o Tecnologia que Interessa!

Você acha que as informações compartilhadas aqui são úteis?
Então me ajude a produzir ainda mais e melhores conteúdos!


É muito fácil. Basta divulgar nossos treinamentos pra alguém que conheça!


E se for de Salvador, podemos estruturar um curso presencial para sua empresa!

Eu vou ficar muito grato (e quem fizer os curso também :)!