Desenvolvimento

24 fev, 2012

Classificador Naive bayes em 50 linhas

Publicidade

O classificador Naive Bayes é um dos algoritmos de aprendizado de máquina mais versáteis que eu já vi durante minha escassa experiência como estudante de graduação, e eu gostaria de fazer uma implementação de brincadeira. Em sua essência, a implementação é reduzida a uma forma de contagem, e o módulo Python inteiro, incluindo uma ferramenta de teste, exigiu apenas 50 linhas de código. Na verdade, eu ainda não avaliei o desempenho, então qualquer comentário é bem-vindo. Sou um amador em Python e tenho certeza de que hackers experientes em Python podem aparar algumas arestas deste código.

Definição e design

Aqui está a definição da funcionalidade do classificador (tirado da Wikipedia):

O que significa que, para cada possível rótulo de classe, multiplica-se a probabilidade condicional de cada característica. Ou seja, para se implementar o classificador, tudo que precisamos fazer é calcular essas probabilidades condicionais individuais para cada rótulo, para cada característica, p(Fi | Cj), e multiplicá-las juntamente com a probabilidade anterior para aquele rótulo p(Cj). O rótulo para o qual conseguirmos o maior produto é o rótulo retornado pelo classificador.

Para calcular essas probabilidades condicionais individuais, nós utilizamos o método de estimativa por máxima verossimilhança (Maximum Likelihood Estimation – MLE). De forma muito simples, nós aproximamos essas probabilidades utilizando os contadores dos vetores de entrada/treino.

Daí temos: p(Fi | Cj) = count(Fi ^ Cj) / count(Cj)

Ou seja, a partir do corpus de treino, nós contamos a taxa do número de ocorrências conjuntas da característica Fi e o rótulo Cj, com o número de ocorrências do rótulo Cj.

Problema da probabilidade zero

E se nós nunca tivéssemos visto uma característica específica Fa e um rótulo específico Cb juntos na base de treino? Sempre que eles ocorrerem no conjunto de teste, a probabilidade p(Fa | Cb) será zero. Daí, o produto global também será zero. Esse é um problema com estimativas por máxima verossimilhança. Só porque uma observação específica não ocorreu durante o treino, não significa que ela nunca ocorrerá nos dados de teste. Visando remediar esse problema, nós utilizamos o que é conhecido como suavização (smoothing). A forma mais simples de suavização, a qual utilizamos no código, é chamada suavização pela adição de um (add one smoothing). Essencialmente, a probabilidade para um evento não visto deveria ser maior que um. Nós conseguimos isso adicionando um a cada contagem nula. O efeito sobre a formulação é a redistribuição de um pouco da massa de probabilidades, alterando observações com contagem não-nula para observações com contagem nula. Sendo assim, nós também precisamos aumentar a contagem total para cada rótulo com o número de possíveis observações, para manter a massa total de probabilidades em 1.

Por exemplo, se nós temos duas classes C = 0 e C = 1, então, após a suavização, as probabilidades suavizadas por MLE podem ser escritas como:

p-smoothed(Fi | Cj) = [count(Fi ^ Cj) + 1]/[count(Cj) + N], onde N é o número total de observações dentro de todas as características no corpus de treino.

Código

Para simplificar, nós vamos utilizar o formato de arquivo ARFF do Weka como entrada. Nós temos uma classe única chamada Model que possui alguns dicionários e listas para armazenar os contadores e os detalhes do vetor de características. Nesta implementação, nós lidamos somente com características de valores discretos.

from __future__ import division
import collections
import math

class Model:
def __init__(self, arffFile):
self.trainingFile = arffFile
self.features = {} #all feature names and their possible values (including the class label)
self.featureNameList = [] #this is to maintain the order of features as in the arff
self.featureCounts = collections.defaultdict(lambda: 1)#contains tuples of the form (label, feature_name, feature_value)
self.featureVectors = [] #contains all the values and the label as the last entry
self.labelCounts = collections.defaultdict(lambda: 0) #these will be smoothed later

O dicionário features salva todos os possíveis valores de uma característica. featureNameList é simplesmente uma lista que contém os nomes das características na mesma ordem em que elas aparecem no arquivo ARFF. Isso acontece porque nosso dicionário de características não possui uma ordem intrínseca, e precisamos manter a ordem das características explicitamente. featureCounts contém os contatores reais de co-ocorrência de cada valor de característica com cada valor de rótulo.

As chaves para esse dicionário são tuplas na seguinte forma (<rótulo da classe>, <nome da característica>, <valor da característica>). Daí, se nós observamos a característica F1 com o valor ‘x’ para o rótulo ‘sim’ quinze vezes, então nós teremos a entrada {(‘sim’, ‘F1’, 15)} no dicionário. Note que o valor default para contadores nesse dicionário é ‘1’ em vez de ‘0’. Isso é porque nós estamos suavizando os contadores. A lista featureVectors na verdade contém todos os vetores de características de entrada do arquivo ARFF. A última característica nesse vetor é o próprio rótulo da classe, de acordo com a convenção de arquivos ARFF do Weka. Por fim, labelCounts armazena os contadores dos rótulos das classes, isto é, quantas vezes nós vimos o rótulo Ci durante o treino.

ERRATA: Acho q tá errado (class_label, feature_name, feature_value) no artigo original. Deveria ser (class_label, feature_name, feature_count) já que no exemplo ele colocou {(‘yes’, ‘F1?, 15)} ao invés de {(‘yes’, ‘F1?, ‘x’)}.

Nós também temos os seguintes métodos na classe Model:

def GetValues(self):
file = open(self.trainingFile, 'r')
for line in file:
if line[0] != '@': #start of actual data
self.featureVectors.append(line.strip().lower().split(','))
else: #feature definitions
if line.strip().lower().find('@data') == -1 and (not line.lower().startswith('@relation')):
self.featureNameList.append(line.strip().split()[1])
self.features[self.featureNameList[len(self.featureNameList) - 1]] = line[line.find('{')+1: line.find('}')].strip().split(',')
file.close()

O método acima simplesmente lê os nomes das características (incluindo os rótulos das classes), seus possíveis valores e os vetores de características propriamente; e popula as apropriadas estruturas de dados definidas acima.

def TrainClassifier(self):
for fv in self.featureVectors:
self.labelCounts[fv[len(fv)-1]] += 1 #udpate count of the label
for counter in range(0, len(fv)-1):
self.featureCounts[(fv[len(fv)-1], self.featureNameList[counter], fv[counter])] += 1

for label in self.labelCounts: #increase label counts (smoothing). remember that the last feature is actually the label
for feature in self.featureNameList[:len(self.featureNameList)-1]:
self.labelCounts[label] += len(self.features[feature])

O método TrainClassifier simplesmente conta o número de co-ocorrência (?) de cada valor de característica com cada rótulo de classe, e os armazena na forma de triplas. Esses contadores são automaticamente suavizados utilizando a suavização pela adição de um, já que o valor default do contador nesse dicionário é ‘1’. Os contadores dos rótulos são também ajustados incrementando-os pelo número total de observações.

def Classify(self, featureVector): #featureVector is a simple list like the ones that we use to train
probabilityPerLabel = {} #store the final probability for each class label
for label in self.labelCounts:
logProb = 0
for featureValue in featureVector:
logProb += math.log(self.featureCounts[(label, self.featureNameList[featureVector.index(featureValue)], featureValue)]/self.labelCounts[label])
probabilityPerLabel[label] = (self.labelCounts[label]/sum(self.labelCounts.values())) * math.exp(logProb)
print probabilityPerLabel
return max(probabilityPerLabel, key = lambda classLabel: probabilityPerLabel[classLabel])

Por fim, nós temos o método Classify, que aceita como argumento um único vetor de características (como uma lista), e calcula (?) o produto das probabilidades condicionais individuais (suavizadas por MLE) para cada rótulo. As probabilidades finais calculadas para cada rótulo são armazenadas no dicionário probabilityPerLabel. Na última linha, nós retornamos a entrada de probabilityPerLabel, que apresenta a maior probabilidade. Note que a multiplicação é na verdade feita como uma adição no domínio de log, já que os números envolvidos são extremamente pequenos. Além disso, um dos fatores utilizados nessa multiplicação é a probabilidade a priori (?) de ter esse rótulo de classe.

Aqui está o código completo, incluindo um método de teste:

#Author: Krishnamurthy Koduvayur Viswanathan

from __future__ import division
import collections
import math

class Model:
def __init__(self, arffFile):
self.trainingFile = arffFile
self.features = {} #all feature names and their possible values (including the class label)
self.featureNameList = [] #this is to maintain the order of features as in the arff
self.featureCounts = collections.defaultdict(lambda: 1)#contains tuples of the form (label, feature_name, feature_value)
self.featureVectors = [] #contains all the values and the label as the last entry
self.labelCounts = collections.defaultdict(lambda: 0) #these will be smoothed later

def TrainClassifier(self):
for fv in self.featureVectors:
self.labelCounts[fv[len(fv)-1]] += 1 #udpate count of the label
for counter in range(0, len(fv)-1):
self.featureCounts[(fv[len(fv)-1], self.featureNameList[counter], fv[counter])] += 1

for label in self.labelCounts: #increase label counts (smoothing). remember that the last feature is actually the label
for feature in self.featureNameList[:len(self.featureNameList)-1]:
self.labelCounts[label] += len(self.features[feature])

def Classify(self, featureVector): #featureVector is a simple list like the ones that we use to train
probabilityPerLabel = {}
for label in self.labelCounts:
logProb = 0
for featureValue in featureVector:
logProb += math.log(self.featureCounts[(label, self.featureNameList[featureVector.index(featureValue)], featureValue)]/self.labelCounts[label])
probabilityPerLabel[label] = (self.labelCounts[label]/sum(self.labelCounts.values())) * math.exp(logProb)
print probabilityPerLabel
return max(probabilityPerLabel, key = lambda classLabel: probabilityPerLabel[classLabel])

def GetValues(self):
file = open(self.trainingFile, 'r')
for line in file:
if line[0] != '@': #start of actual data
self.featureVectors.append(line.strip().lower().split(','))
else: #feature definitions
if line.strip().lower().find('@data') == -1 and (not line.lower().startswith('@relation')):
self.featureNameList.append(line.strip().split()[1])
self.features[self.featureNameList[len(self.featureNameList) - 1]] = line[line.find('{')+1: line.find('}')].strip().split(',')
file.close()

def TestClassifier(self, arffFile):
file = open(arffFile, 'r')
for line in file:
if line[0] != '@':
vector = line.strip().lower().split(',')
print "classifier: " + self.Classify(vector) + " given " + vector[len(vector) - 1]

if __name__ == "__main__":
model = Model("/home/tennis.arff")
model.GetValues()
model.TrainClassifier()
model.TestClassifier("/home/tennis.arff")

Baixe o arquivo de exemplo ARFF para testar.

Update: Eu encontrei um bug na última e única linha da função GetValues(). Essa linha pega os possíveis valores de atributo do arquivo ARFF e os armazena em self.featureNameList. Esse método não lida com espaços em branco corretamente. Altere essa linha para:

self.features[self.featureNameList[len(self.featureNameList) - 1]] = [featureName.strip() for featureName in line[line.find('{')+1: line.find('}')].strip().split(',')]?

Texto original disponível em http://ebiquity.umbc.edu/blogger/2010/12/07/naive-bayes-classifier-in-50-lines/