Apriori 算法讲解
Apriori 算法是一种经典的关联规则挖掘算法,主要用于发现数据集中的频繁项集(Frequent Itemsets),并基于这些频繁项集生成关联规则。该算法广泛应用于市场篮分析、推荐系统、股票关联分析等领域。
1. 算法背景
在关联规则挖掘中,发现数据集中项与项之间的关联关系。
在市场篮分析中,可能发现“购买牛奶的顾客也会购买面包”这样的关联规则。Apriori 算法通过逐层搜索的方式,从数据集中找出频繁项集,并基于频繁项集生成关联规则。
Apriori 算法在金融分析领域中,金融市场轮动板块容易受到多种复杂多变的因素的影响,最终表现为行业股价的某种周期现象,投资者可以通过观察行业的近期涨跌现象等规律,对股票的未来行业进行预测
2. 基本概念
Association rules analysis is a technique to uncover how items are associated to each other. There are three common ways to measure association: Support, Confidence, Lift
Support. This says how popular an itemset is, as measured by the proportion of transactions in which an itemset appears.
Confidence. This says how likely item Y is purchased when item X is purchased, expressed as {X -> Y}.
Lift. This says how likely item Y is purchased when item X is purchased, while controlling for how popular item Y is.
项集(Itemset)
- 项集是数据集中的一组项的集合。例如,在超市购物数据中,{牛奶, 面包} 是一个项集。
- 项集的大小称为项集的长度。例如,{牛奶, 面包} 的长度为 2。
频繁项集(Frequent Itemset)
- 频繁项集是指在数据集中出现次数(支持度)大于或等于某个阈值(最小支持度,min_support)的项集。
- 例如,如果最小支持度为 0.5,那么在所有交易中,{牛奶, 面包} 出现的比例超过 50%,则它是一个频繁项集。
支持度(Support)
- 支持度是指某个项集在数据集中出现的频率。
- 计算公式:
\(\text{Support}(A \rightarrow B) = \frac{|A \cup B|}{|T|}\) - ∣A∪B∣ 表示同时包含 A 和 B 的交易数量。
- ∣T∣表示总交易数量。
- 例如,如果数据集中有 100 笔交易,其中有 30 笔交易包含 {牛奶, 面包},则支持度为 0.3。
置信度(Confidence)
- 置信度是指在项集 A 出现的情况下,项集 B 也出现的概率。
- 计算公式:
\(\text{Confidence}(A \rightarrow B) = \frac{\text{Support}(A \rightarrow B)}{\text{Support}(A)} = \frac{|A \cup B|}{|A|}\) - ∣A∣ 表示包含 A 的交易数量。
- 例如,如果 {牛奶} 的支持度为 0.4,{牛奶, 面包} 的支持度为 0.3,则置信度为 0.75。
提升度(Lift)
- 提升度衡量项集 A 的出现是否依赖于项集 B 的出现。
- 计算公式:
\(\text{Lift}(A \rightarrow B) = \frac{\text{Confidence}(A \rightarrow B)}{\text{Support}(B)} = \frac{|A \cup B|}{|A| \times |B|}\) - ∣B∣表示包含 B 的交易数量。
- 提升度大于 1 表示 A 的出现与 B 的出现正相关,小于 1 表示负相关,等于 1 表示独立。
[!NOTE] 提升度(Lift)等于 1 表示独立
当提升度等于 1 时,表示项集 A 和项集 B 之间独立,即 B 的出现与 A 的是否出现无关(因为在A出现的前提下,B出现的概率与B在总集合中出现的概率一致)。换句话说,B 的出现概率与 A 是否出现无关。
在模型评估中,Lift是“运用该模型”和“未运用该模型”所得结果的比值,如果使用模型前后该项数值没有变化(即提升度等于1),说明模型运用对数据呈现没有影响。
\(\text{Lift}(A \rightarrow B) = \frac{\text{Confidence}(A \rightarrow B)}{\text{Support}(B)} = \frac{|A \cup B|}{|A| \times |B|}\)3. 算法步骤
Apriori 算法的核心思想是利用频繁项集的性质:频繁项集的子集一定是频繁的。
基于这一性质,算法通过逐层生成候选集,并筛选出频繁项集。
步骤 1:生成频繁 1-项集(Frequent 1-Itemsets)
- 扫描数据集,统计每个项的支持度,筛选出满足最小支持度的项,生成频繁 1-项集。
- 扫描数据集,统计每个项的出现次数。
- 根据最小支持度阈值,筛选出频繁 1-项集。
步骤 2:生成候选 k-项集(Candidate k-Itemsets)
- 基于频繁 (k-1)-项集,通过连接操作生成候选 k-项集。
- 连接操作:如果两个 (k-1)-项集的前 k−2k−2 项相同,则可以将它们连接生成一个 k-项集。
步骤 3:剪枝(Pruning)
- 检查候选 k-项集的所有 (k-1)-子集是否都是频繁的。如果不是,则删除该候选集。
- 对于每个候选 k-项集,检查其所有 (k-1)-子集是否存在于频繁 (k-1)-项集中。
- 如果不存在,则删除该候选集。
步骤 4:计算支持度
- 扫描数据集,计算候选 k-项集的支持度,筛选出满足最小支持度的频繁 k-项集。
- 统计候选 k-项集在数据集中的出现次数。
- 根据最小支持度阈值,筛选出频繁 k-项集。
步骤 5:重复生成和筛选
- 重复步骤 2 到步骤 4,直到无法生成新的频繁项集。
步骤 6:生成关联规则
- 基于频繁项集生成关联规则。
- 对于每个频繁项集 XX,生成所有可能的非空子集 YY。
- 计算置信度(Confidence)和提升度(Lift)。
- 筛选出满足最小置信度的关联规则。
Golang代码实现
使用Go协程实现Apriori算法确实可以带来性能上的提升,主要有以下几个优势:
- 并行计算支持度:
- 将候选项集分成多个批次
- 每个批次由独立的协程处理
- 充分利用多核CPU资源
- 性能优化点:
- 使用通道(channel)安全地收集结果
- 使用WaitGroup确保所有协程完成
- 可以根据CPU核心数动态调整协程数量
- 实现要点:
- 主要在
calculateSupportConcurrent函数中实现并发 - 使用
sync.WaitGroup控制协程同步 - 使用channel收集频繁项集结果
- 主要在
package apriori
import (
"runtime"
"sort"
"sync"
)
// ItemSet represents a set of items
type ItemSet map[string]bool
// Transaction represents a single transaction containing items
type Transaction []string
// AssociationRule represents a single association rule
type AssociationRule struct {
Antecedent ItemSet
Consequent ItemSet
Support float64
Confidence float64
}
// AprioriResult stores the final frequent itemsets and their support counts
type AprioriResult struct {
FrequentItemsets map[int][]ItemSetSupport
Rules []AssociationRule
}
// ItemSetSupport stores an itemset and its support count
type ItemSetSupport struct {
Items ItemSet
Support int
}
// ConcurrentApriori implements the Apriori algorithm with goroutines and generates association rules
func ConcurrentApriori(transactions []Transaction, minSupport, minConfidence float64) *AprioriResult {
result := &AprioriResult{
FrequentItemsets: make(map[int][]ItemSetSupport),
}
// 获取所有单个物品并初始化C1
c1 := generateC1(transactions)
// 存储所有频繁项集及其支持度,用于后续生成关联规则
allFrequentItemsets := make(map[string]float64)
transactionCount := float64(len(transactions))
k := 1
for {
// 并发计算支持度
candidateSupports := calculateSupportConcurrent(c1, transactions, minSupport)
if len(candidateSupports) == 0 {
break
}
// 保存频繁项集
result.FrequentItemsets[k] = candidateSupports
// 保存到全局频繁项集map中,用于生成关联规则
for _, itemSupport := range candidateSupports {
allFrequentItemsets[itemsetToString(itemSupport.Items)] = float64(itemSupport.Support) / transactionCount
}
// 并发生成下一轮候选项集
c1 = generateCandidatesConcurrent(candidateSupports)
if len(c1) == 0 {
break
}
k++
}
// 并发生成关联规则
result.Rules = generateAssociationRulesConcurrent(allFrequentItemsets, minConfidence)
// 按照置信度排序规则
sort.Slice(result.Rules, func(i, j int) bool {
return result.Rules[i].Confidence > result.Rules[j].Confidence
})
return result
}
// generateC1 generates the initial set of candidates of size 1
func generateC1(transactions []Transaction) []ItemSet {
items := make(map[string]bool)
for _, transaction := range transactions {
for _, item := range transaction {
items[item] = true
}
}
var c1 []ItemSet
for item := range items {
itemset := make(ItemSet)
itemset[item] = true
c1 = append(c1, itemset)
}
return c1
}
// calculateSupportConcurrent calculates support concurrently for candidate itemsets
func calculateSupportConcurrent(candidates []ItemSet, transactions []Transaction, minSupport float64) []ItemSetSupport {
if len(candidates) == 0 {
return nil
}
var wg sync.WaitGroup
supportChan := make(chan ItemSetSupport, len(candidates))
// 使用CPU核心数来确定goroutine数量
workerCount := runtime.NumCPU()
batchSize := (len(candidates) + workerCount - 1) / workerCount
for i := 0; i < len(candidates); i += batchSize {
end := i + batchSize
if end > len(candidates) {
end = len(candidates)
}
wg.Add(1)
go func(candidatesBatch []ItemSet) {
defer wg.Done()
for _, itemset := range candidatesBatch {
support := calculateSupport(itemset, transactions)
if float64(support)/float64(len(transactions)) >= minSupport {
supportChan <- ItemSetSupport{
Items: itemset,
Support: support,
}
}
}
}(candidates[i:end])
}
// 等待所有goroutine完成并关闭通道
go func() {
wg.Wait()
close(supportChan)
}()
// 收集结果
var result []ItemSetSupport
for support := range supportChan {
result = append(result, support)
}
return result
}
// calculateSupport calculates support for a single itemset
func calculateSupport(itemset ItemSet, transactions []Transaction) int {
support := 0
for _, transaction := range transactions {
if containsAll(transaction, itemset) {
support++
}
}
return support
}
// containsAll checks if a transaction contains all items in an itemset
func containsAll(transaction Transaction, itemset ItemSet) bool {
for item := range itemset {
found := false
for _, transItem := range transaction {
if item == transItem {
found = true
break
}
}
if !found {
return false
}
}
return true
}
// generateCandidatesConcurrent generates next candidates concurrently
func generateCandidatesConcurrent(frequentSets []ItemSetSupport) []ItemSet {
if len(frequentSets) < 2 {
return nil
}
var wg sync.WaitGroup
candidatesChan := make(chan ItemSet, len(frequentSets)*len(frequentSets))
// 使用CPU核心数来确定goroutine数量
workerCount := runtime.NumCPU()
batchSize := (len(frequentSets) + workerCount - 1) / workerCount
for i := 0; i < len(frequentSets); i += batchSize {
end := i + batchSize
if end > len(frequentSets) {
end = len(frequentSets)
}
wg.Add(1)
go func(start, end int) {
defer wg.Done()
for i := start; i < end; i++ {
for j := i + 1; j < len(frequentSets); j++ {
if candidate := joinItemsets(frequentSets[i].Items, frequentSets[j].Items); candidate != nil {
candidatesChan <- candidate
}
}
}
}(i, end)
}
// 等待所有goroutine完成并关闭通道
go func() {
wg.Wait()
close(candidatesChan)
}()
// 收集结果并去重
seen := make(map[string]bool)
var candidates []ItemSet
for candidate := range candidatesChan {
key := itemsetToString(candidate)
if !seen[key] {
seen[key] = true
candidates = append(candidates, candidate)
}
}
return candidates
}
// joinItemsets joins two itemsets if they can be joined
func joinItemsets(items1, items2 ItemSet) ItemSet {
// 如果两个项集的大小不同,则不能连接
if len(items1) != len(items2) {
return nil
}
// 找到不同的项
var diff string
var diffCount int
for item := range items1 {
if !items2[item] {
diff = item
diffCount++
}
}
for item := range items2 {
if !items1[item] {
diffCount++
}
}
// 如果只有一个不同项,则可以连接
if diffCount == 2 {
result := make(ItemSet)
for item := range items1 {
result[item] = true
}
for item := range items2 {
result[item] = true
}
return result
}
return nil
}
// generateAssociationRulesConcurrent generates association rules concurrently
func generateAssociationRulesConcurrent(frequentItemsets map[string]float64, minConfidence float64) []AssociationRule {
var wg sync.WaitGroup
rulesChan := make(chan AssociationRule, len(frequentItemsets)*10) // 预估每个频繁项集可能产生的规则数
itemsets := make([]ItemSet, 0, len(frequentItemsets))
for itemsetStr := range frequentItemsets {
itemset := stringToItemSet(itemsetStr)
if len(itemset) > 1 {
itemsets = append(itemsets, itemset)
}
}
// 使用CPU核心数来确定goroutine数量
workerCount := runtime.NumCPU()
batchSize := (len(itemsets) + workerCount - 1) / workerCount
for i := 0; i < len(itemsets); i += batchSize {
end := i + batchSize
if end > len(itemsets) {
end = len(itemsets)
}
wg.Add(1)
go func(itemsetBatch []ItemSet) {
defer wg.Done()
for _, itemset := range itemsetBatch {
generateRulesForItemset(itemset, frequentItemsets, minConfidence, rulesChan)
}
}(itemsets[i:end])
}
// 等待所有goroutine完成并关闭通道
go func() {
wg.Wait()
close(rulesChan)
}()
// 收集结果
var rules []AssociationRule
for rule := range rulesChan {
rules = append(rules, rule)
}
return rules
}
// generateRulesForItemset generates all valid rules for a single itemset
func generateRulesForItemset(itemset ItemSet, frequentItemsets map[string]float64, minConfidence float64, rulesChan chan<- AssociationRule) {
subsets := generateAllSubsets(itemset)
itemsetSupport := frequentItemsets[itemsetToString(itemset)]
for _, subset := range subsets {
if len(subset) == 0 || len(subset) == len(itemset) {
continue
}
// 生成后件
consequent := make(ItemSet)
for item := range itemset {
if !subset[item] {
consequent[item] = true
}
}
// 计算置信度
antecedentSupport := frequentItemsets[itemsetToString(subset)]
if antecedentSupport > 0 {
confidence := itemsetSupport / antecedentSupport
if confidence >= minConfidence {
rulesChan <- AssociationRule{
Antecedent: subset,
Consequent: consequent,
Support: itemsetSupport,
Confidence: confidence,
}
}
}
}
}
// generateAllSubsets generates all subsets of an itemset
func generateAllSubsets(itemset ItemSet) []ItemSet {
items := make([]string, 0, len(itemset))
for item := range itemset {
items = append(items, item)
}
var subsets []ItemSet
n := len(items)
for i := 0; i < (1 << n); i++ {
subset := make(ItemSet)
for j := 0; j < n; j++ {
if (i & (1 << j)) != 0 {
subset[items[j]] = true
}
}
subsets = append(subsets, subset)
}
return subsets
}
// itemsetToString converts an itemset to its string representation
func itemsetToString(itemset ItemSet) string {
if len(itemset) == 0 {
return ""
}
items := make([]string, 0, len(itemset))
for item := range itemset {
items = append(items, item)
}
sort.Strings(items)
result := items[0]
for i := 1; i < len(items); i++ {
result += "," + items[i]
}
return result
}
// stringToItemSet converts a string representation back to an itemset
func stringToItemSet(str string) ItemSet {
if str == "" {
return make(ItemSet)
}
itemset := make(ItemSet)
var item string
for _, ch := range str {
if ch == ',' {
if item != "" {
itemset[item] = true
item = ""
}
} else {
item += string(ch)
}
}
if item != "" {
itemset[item] = true
}
return itemset
}
这个实现主要有以下特点:
- 并发处理:
- 支持度计算的并发处理
- 候选项集生成的并发处理
- 关联规则生成的并发处理
- 所有并发操作都基于 CPU 核心数量动态调整
- 性能提升:
- 使用 map 进行快速查找
- 项集连接操作的优化
- 高效的字符串转换方法
- 结果去重处理
使用示例:
package main
import (
"fmt"
)
func main() {
// 创建测试数据 - 购物篮数据示例
transactions := []apriori.Transaction{
{"bread", "milk", "eggs"},
{"bread", "butter", "milk"},
{"milk", "eggs", "coffee"},
{"bread", "milk", "butter", "eggs"},
{"bread", "coffee", "butter"},
{"milk", "butter", "coffee"},
{"bread", "milk", "butter", "coffee"},
{"bread", "milk", "eggs", "butter"},
}
// 设置最小支持度和最小置信度
minSupport := 0.3 // 30% - 商品组合至少出现在30%的交易中
minConfidence := 0.7 // 70% - 规则的置信度至少为70%
// 运行Apriori算法
result := apriori.ConcurrentApriori(transactions, minSupport, minConfidence)
// 打印频繁项集
fmt.Println("频繁项集:")
for k, itemsets := range result.FrequentItemsets {
fmt.Printf("\n%d项集:\n", k)
for _, itemset := range itemsets {
fmt.Printf("项集: %v, 支持度计数: %d\n", itemsetToString(itemset.Items), itemset.Support)
}
}
// 打印关联规则
fmt.Println("\n关联规则:")
for _, rule := range result.Rules {
fmt.Printf("%v => %v (支持度: %.2f, 置信度: %.2f)\n",
itemsetToString(rule.Antecedent),
itemsetToString(rule.Consequent),
rule.Support,
rule.Confidence)
}
}
// 辅助函数:将ItemSet转换为可读字符串
func itemsetToString(items apriori.ItemSet) string {
result := "{"
first := true
for item := range items {
if !first {
result += ", "
}
result += item
first = false
}
result += "}"
return result
}
这个实现相比传统的串行实现,在处理大规模数据集时会有明显的性能提升。特别是在计算支持度这个计算密集型的步骤中,并行处理可以显著减少执行时间。
需要注意的是,性能提升的程度取决于:
- 数据集的大小
- 事务的数量
- CPU核心数
- 最小支持度阈值
对于小型数据集,由于协程创建和管理的开销,可能不会带来明显的性能提升。建议在数据集较大时使用这个并发实现。
4. 算法示例
假设有一个超市的交易数据集如下:
| 交易编号 | 购买的商品 |
|---|---|
| 1 | 牛奶, 面包 |
| 2 | 牛奶, 面包, 鸡蛋 |
| 3 | 牛奶, 鸡蛋 |
| 4 | 面包, 鸡蛋 |
| 5 | 牛奶, 面包 |
步骤 1:生成频繁 1-项集
- 统计每个商品的支持度:
- 牛奶:4/5 = 0.8
- 面包:4/5 = 0.8
- 鸡蛋:3/5 = 0.6
- 假设最小支持度为 0.5,则频繁 1-项集为 {牛奶}, {面包}, {鸡蛋}。
步骤 2:生成候选 2-项集
- 连接频繁 1-项集:
- {牛奶, 面包}
- {牛奶, 鸡蛋}
- {面包, 鸡蛋}
步骤 3:剪枝
- 检查子集是否频繁:
- {牛奶, 面包}:子集 {牛奶}, {面包} 都是频繁的。
- {牛奶, 鸡蛋}:子集 {牛奶}, {鸡蛋} 都是频繁的。
- {面包, 鸡蛋}:子集 {面包}, {鸡蛋} 都是频繁的。
步骤 4:计算支持度
- 计算候选 2-项集的支持度:
- {牛奶, 面包}:3/5 = 0.6
- {牛奶, 鸡蛋}:2/5 = 0.4
- {面包, 鸡蛋}:2/5 = 0.4
- 筛选出频繁 2-项集:{牛奶, 面包}。
步骤 5:生成关联规则
- 基于频繁 2-项集 {牛奶, 面包},生成关联规则:
- {牛奶} → {面包}:置信度 = 3/4 = 0.75
- {面包} → {牛奶}:置信度 = 3/4 = 0.75
5. 改进算法
虽然Apriori 算法的基本思想和实现步骤非常清晰,然而该算法随着项集规模的增加,候选集的数量呈指数增长,导致计算开销较大;并且算法需要多次扫描数据集以计算支持度,涉及到大数据集时效率较低。
为了克服 Apriori 算法的缺点,后续提出了一些改进算法,例如:
- FP-Growth 算法:通过构建频繁模式树(FP-Tree)来减少数据集扫描次数。
- Eclat 算法:使用垂直数据格式和交集操作来加速频繁项集的挖掘。