Apriori 算法是一种经典的关联规则挖掘算法,主要用于发现数据集中的频繁项集(Frequent Itemsets),并基于这些频繁项集生成关联规则。该算法广泛应用于市场篮分析、推荐系统、股票关联分析等领域。

1. 算法背景

在关联规则挖掘中,发现数据集中项与项之间的关联关系。

在市场篮分析中,可能发现“购买牛奶的顾客也会购买面包”这样的关联规则。Apriori 算法通过逐层搜索的方式,从数据集中找出频繁项集,并基于频繁项集生成关联规则。

Apriori 算法在金融分析领域中,金融市场轮动板块容易受到多种复杂多变的因素的影响,最终表现为行业股价的某种周期现象,投资者可以通过观察行业的近期涨跌现象等规律,对股票的未来行业进行预测

2. 基本概念

Association rules analysis is a technique to uncover how items are associated to each other. There are three common ways to measure association: Support, Confidence, Lift

Support. This says how popular an itemset is, as measured by the proportion of transactions in which an itemset appears.
Confidence. This says how likely item Y is purchased when item X is purchased, expressed as {X -> Y}.
Lift. This says how likely item Y is purchased when item X is purchased, while controlling for how popular item Y is.

项集(Itemset)

  • 项集是数据集中的一组项的集合。例如,在超市购物数据中,{牛奶, 面包} 是一个项集。
  • 项集的大小称为项集的长度。例如,{牛奶, 面包} 的长度为 2。

    频繁项集(Frequent Itemset)

  • 频繁项集是指在数据集中出现次数(支持度)大于或等于某个阈值(最小支持度,min_support)的项集。
  • 例如,如果最小支持度为 0.5,那么在所有交易中,{牛奶, 面包} 出现的比例超过 50%,则它是一个频繁项集。

    支持度(Support)

  • 支持度是指某个项集在数据集中出现的频率。
  • 计算公式:
    \(\text{Support}(A \rightarrow B) = \frac{|A \cup B|}{|T|}\)
  • ∣A∪B∣ 表示同时包含 A 和 B 的交易数量。
  • ∣T∣表示总交易数量。
  • 例如,如果数据集中有 100 笔交易,其中有 30 笔交易包含 {牛奶, 面包},则支持度为 0.3。

    置信度(Confidence)

  • 置信度是指在项集 A 出现的情况下,项集 B 也出现的概率。
  • 计算公式:
    \(\text{Confidence}(A \rightarrow B) = \frac{\text{Support}(A \rightarrow B)}{\text{Support}(A)} = \frac{|A \cup B|}{|A|}\)
  • ∣A∣ 表示包含 A 的交易数量。
  • 例如,如果 {牛奶} 的支持度为 0.4,{牛奶, 面包} 的支持度为 0.3,则置信度为 0.75。

    提升度(Lift)

  • 提升度衡量项集 A 的出现是否依赖于项集 B 的出现。
  • 计算公式:
    \(\text{Lift}(A \rightarrow B) = \frac{\text{Confidence}(A \rightarrow B)}{\text{Support}(B)} = \frac{|A \cup B|}{|A| \times |B|}\)
  • ∣B∣表示包含 B 的交易数量。
  • 提升度大于 1 表示 A 的出现与 B 的出现正相关,小于 1 表示负相关,等于 1 表示独立。

[!NOTE] 提升度(Lift)等于 1 表示独立
当提升度等于 1 时,表示项集 A 和项集 B 之间独立,即 B 的出现与 A 的是否出现无关(因为在A出现的前提下,B出现的概率与B在总集合中出现的概率一致)。换句话说,B 的出现概率与 A 是否出现无关。
在模型评估中,Lift是“运用该模型”和“未运用该模型”所得结果的比值,如果使用模型前后该项数值没有变化(即提升度等于1),说明模型运用对数据呈现没有影响。
\(\text{Lift}(A \rightarrow B) = \frac{\text{Confidence}(A \rightarrow B)}{\text{Support}(B)} = \frac{|A \cup B|}{|A| \times |B|}\)

3. 算法步骤

Apriori 算法的核心思想是利用频繁项集的性质:频繁项集的子集一定是频繁的。
基于这一性质,算法通过逐层生成候选集,并筛选出频繁项集。

步骤 1:生成频繁 1-项集(Frequent 1-Itemsets)

  • 扫描数据集,统计每个项的支持度,筛选出满足最小支持度的项,生成频繁 1-项集。
    • 扫描数据集,统计每个项的出现次数。
    • 根据最小支持度阈值,筛选出频繁 1-项集。

      步骤 2:生成候选 k-项集(Candidate k-Itemsets)

  • 基于频繁 (k-1)-项集,通过连接操作生成候选 k-项集。
  • 连接操作:如果两个 (k-1)-项集的前 k−2k−2 项相同,则可以将它们连接生成一个 k-项集。

    步骤 3:剪枝(Pruning)

  • 检查候选 k-项集的所有 (k-1)-子集是否都是频繁的。如果不是,则删除该候选集。
    • 对于每个候选 k-项集,检查其所有 (k-1)-子集是否存在于频繁 (k-1)-项集中。
    • 如果不存在,则删除该候选集。

      步骤 4:计算支持度

  • 扫描数据集,计算候选 k-项集的支持度,筛选出满足最小支持度的频繁 k-项集。
    • 统计候选 k-项集在数据集中的出现次数。
    • 根据最小支持度阈值,筛选出频繁 k-项集。

      步骤 5:重复生成和筛选

  • 重复步骤 2 到步骤 4,直到无法生成新的频繁项集。

    步骤 6:生成关联规则

  • 基于频繁项集生成关联规则。
    • 对于每个频繁项集 XX,生成所有可能的非空子集 YY。
    • 计算置信度(Confidence)和提升度(Lift)。
    • 筛选出满足最小置信度的关联规则。

Golang代码实现

使用Go协程实现Apriori算法确实可以带来性能上的提升,主要有以下几个优势:

  1. 并行计算支持度:
    • 将候选项集分成多个批次
    • 每个批次由独立的协程处理
    • 充分利用多核CPU资源
  2. 性能优化点:
    • 使用通道(channel)安全地收集结果
    • 使用WaitGroup确保所有协程完成
    • 可以根据CPU核心数动态调整协程数量
  3. 实现要点:
    • 主要在calculateSupportConcurrent函数中实现并发
    • 使用sync.WaitGroup控制协程同步
    • 使用channel收集频繁项集结果
package apriori  
  
import (  
    "runtime"  
    "sort"  
    "sync"  
)  
  
// ItemSet represents a set of items  
type ItemSet map[string]bool  
  
// Transaction represents a single transaction containing items  
type Transaction []string  
  
// AssociationRule represents a single association rule  
type AssociationRule struct {  
    Antecedent ItemSet  
    Consequent ItemSet  
    Support    float64  
    Confidence float64  
}  
  
// AprioriResult stores the final frequent itemsets and their support counts  
type AprioriResult struct {  
    FrequentItemsets map[int][]ItemSetSupport  
    Rules            []AssociationRule  
}  
  
// ItemSetSupport stores an itemset and its support count  
type ItemSetSupport struct {  
    Items   ItemSet  
    Support int  
}  
  
// ConcurrentApriori implements the Apriori algorithm with goroutines and generates association rules  
func ConcurrentApriori(transactions []Transaction, minSupport, minConfidence float64) *AprioriResult {  
    result := &AprioriResult{  
        FrequentItemsets: make(map[int][]ItemSetSupport),  
    }  
      
    // 获取所有单个物品并初始化C1  
    c1 := generateC1(transactions)  
      
    // 存储所有频繁项集及其支持度,用于后续生成关联规则  
    allFrequentItemsets := make(map[string]float64)  
    transactionCount := float64(len(transactions))  
      
    k := 1  
    for {  
        // 并发计算支持度  
        candidateSupports := calculateSupportConcurrent(c1, transactions, minSupport)  
        if len(candidateSupports) == 0 {  
            break  
        }  
          
        // 保存频繁项集  
        result.FrequentItemsets[k] = candidateSupports  
          
        // 保存到全局频繁项集map中,用于生成关联规则  
        for _, itemSupport := range candidateSupports {  
            allFrequentItemsets[itemsetToString(itemSupport.Items)] = float64(itemSupport.Support) / transactionCount  
        }  
          
        // 并发生成下一轮候选项集  
        c1 = generateCandidatesConcurrent(candidateSupports)  
        if len(c1) == 0 {  
            break  
        }  
        k++  
    }  
      
    // 并发生成关联规则  
    result.Rules = generateAssociationRulesConcurrent(allFrequentItemsets, minConfidence)  
      
    // 按照置信度排序规则  
    sort.Slice(result.Rules, func(i, j int) bool {  
        return result.Rules[i].Confidence > result.Rules[j].Confidence  
    })  
      
    return result  
}  
  
// generateC1 generates the initial set of candidates of size 1  
func generateC1(transactions []Transaction) []ItemSet {  
    items := make(map[string]bool)  
    for _, transaction := range transactions {  
        for _, item := range transaction {  
            items[item] = true  
        }  
    }  
      
    var c1 []ItemSet  
    for item := range items {  
        itemset := make(ItemSet)  
        itemset[item] = true  
        c1 = append(c1, itemset)  
    }  
    return c1  
}  
  
// calculateSupportConcurrent calculates support concurrently for candidate itemsets  
func calculateSupportConcurrent(candidates []ItemSet, transactions []Transaction, minSupport float64) []ItemSetSupport {  
    if len(candidates) == 0 {  
        return nil  
    }  
      
    var wg sync.WaitGroup  
    supportChan := make(chan ItemSetSupport, len(candidates))  
      
    // 使用CPU核心数来确定goroutine数量  
    workerCount := runtime.NumCPU()  
    batchSize := (len(candidates) + workerCount - 1) / workerCount  
      
    for i := 0; i < len(candidates); i += batchSize {  
        end := i + batchSize  
        if end > len(candidates) {  
            end = len(candidates)  
        }  
          
        wg.Add(1)  
        go func(candidatesBatch []ItemSet) {  
            defer wg.Done()  
            for _, itemset := range candidatesBatch {  
                support := calculateSupport(itemset, transactions)  
                if float64(support)/float64(len(transactions)) >= minSupport {  
                    supportChan <- ItemSetSupport{  
                        Items:   itemset,  
                        Support: support,  
                    }  
                }  
            }  
        }(candidates[i:end])  
    }  
      
    // 等待所有goroutine完成并关闭通道  
    go func() {  
        wg.Wait()  
        close(supportChan)  
    }()  
      
    // 收集结果  
    var result []ItemSetSupport  
    for support := range supportChan {  
        result = append(result, support)  
    }  
      
    return result  
}  
  
// calculateSupport calculates support for a single itemset  
func calculateSupport(itemset ItemSet, transactions []Transaction) int {  
    support := 0  
    for _, transaction := range transactions {  
        if containsAll(transaction, itemset) {  
            support++  
        }  
    }  
    return support  
}  
  
// containsAll checks if a transaction contains all items in an itemset  
func containsAll(transaction Transaction, itemset ItemSet) bool {  
    for item := range itemset {  
        found := false  
        for _, transItem := range transaction {  
            if item == transItem {  
                found = true  
                break  
            }  
        }  
        if !found {  
            return false  
        }  
    }  
    return true  
}  
  
// generateCandidatesConcurrent generates next candidates concurrently  
func generateCandidatesConcurrent(frequentSets []ItemSetSupport) []ItemSet {  
    if len(frequentSets) < 2 {  
        return nil  
    }  
      
    var wg sync.WaitGroup  
    candidatesChan := make(chan ItemSet, len(frequentSets)*len(frequentSets))  
      
    // 使用CPU核心数来确定goroutine数量  
    workerCount := runtime.NumCPU()  
    batchSize := (len(frequentSets) + workerCount - 1) / workerCount  
      
    for i := 0; i < len(frequentSets); i += batchSize {  
        end := i + batchSize  
        if end > len(frequentSets) {  
            end = len(frequentSets)  
        }  
          
        wg.Add(1)  
        go func(start, end int) {  
            defer wg.Done()  
            for i := start; i < end; i++ {  
                for j := i + 1; j < len(frequentSets); j++ {  
                    if candidate := joinItemsets(frequentSets[i].Items, frequentSets[j].Items); candidate != nil {  
                        candidatesChan <- candidate  
                    }  
                }  
            }  
        }(i, end)  
    }  
      
    // 等待所有goroutine完成并关闭通道  
    go func() {  
        wg.Wait()  
        close(candidatesChan)  
    }()  
      
    // 收集结果并去重  
    seen := make(map[string]bool)  
    var candidates []ItemSet  
    for candidate := range candidatesChan {  
        key := itemsetToString(candidate)  
        if !seen[key] {  
            seen[key] = true  
            candidates = append(candidates, candidate)  
        }  
    }  
      
    return candidates  
}  
  
// joinItemsets joins two itemsets if they can be joined  
func joinItemsets(items1, items2 ItemSet) ItemSet {  
    // 如果两个项集的大小不同,则不能连接  
    if len(items1) != len(items2) {  
        return nil  
    }  
      
    // 找到不同的项  
    var diff string  
    var diffCount int  
      
    for item := range items1 {  
        if !items2[item] {  
            diff = item  
            diffCount++  
        }  
    }  
      
    for item := range items2 {  
        if !items1[item] {  
            diffCount++  
        }  
    }  
      
    // 如果只有一个不同项,则可以连接  
    if diffCount == 2 {  
        result := make(ItemSet)  
        for item := range items1 {  
            result[item] = true  
        }  
        for item := range items2 {  
            result[item] = true  
        }  
        return result  
    }  
      
    return nil  
}  
  
// generateAssociationRulesConcurrent generates association rules concurrently  
func generateAssociationRulesConcurrent(frequentItemsets map[string]float64, minConfidence float64) []AssociationRule {  
    var wg sync.WaitGroup  
    rulesChan := make(chan AssociationRule, len(frequentItemsets)*10) // 预估每个频繁项集可能产生的规则数  
      
    itemsets := make([]ItemSet, 0, len(frequentItemsets))  
    for itemsetStr := range frequentItemsets {  
        itemset := stringToItemSet(itemsetStr)  
        if len(itemset) > 1 {  
            itemsets = append(itemsets, itemset)  
        }  
    }  
      
    // 使用CPU核心数来确定goroutine数量  
    workerCount := runtime.NumCPU()  
    batchSize := (len(itemsets) + workerCount - 1) / workerCount  
      
    for i := 0; i < len(itemsets); i += batchSize {  
        end := i + batchSize  
        if end > len(itemsets) {  
            end = len(itemsets)  
        }  
          
        wg.Add(1)  
        go func(itemsetBatch []ItemSet) {  
            defer wg.Done()  
            for _, itemset := range itemsetBatch {  
                generateRulesForItemset(itemset, frequentItemsets, minConfidence, rulesChan)  
            }  
        }(itemsets[i:end])  
    }  
      
    // 等待所有goroutine完成并关闭通道  
    go func() {  
        wg.Wait()  
        close(rulesChan)  
    }()  
      
    // 收集结果  
    var rules []AssociationRule  
    for rule := range rulesChan {  
        rules = append(rules, rule)  
    }  
      
    return rules  
}  
  
// generateRulesForItemset generates all valid rules for a single itemset  
func generateRulesForItemset(itemset ItemSet, frequentItemsets map[string]float64, minConfidence float64, rulesChan chan<- AssociationRule) {  
    subsets := generateAllSubsets(itemset)  
    itemsetSupport := frequentItemsets[itemsetToString(itemset)]  
      
    for _, subset := range subsets {  
        if len(subset) == 0 || len(subset) == len(itemset) {  
            continue  
        }  
          
        // 生成后件  
        consequent := make(ItemSet)  
        for item := range itemset {  
            if !subset[item] {  
                consequent[item] = true  
            }  
        }  
          
        // 计算置信度  
        antecedentSupport := frequentItemsets[itemsetToString(subset)]  
        if antecedentSupport > 0 {  
            confidence := itemsetSupport / antecedentSupport  
            if confidence >= minConfidence {  
                rulesChan <- AssociationRule{  
                    Antecedent: subset,  
                    Consequent: consequent,  
                    Support:    itemsetSupport,  
                    Confidence: confidence,  
                }  
            }  
        }  
    }  
}  
  
// generateAllSubsets generates all subsets of an itemset  
func generateAllSubsets(itemset ItemSet) []ItemSet {  
    items := make([]string, 0, len(itemset))  
    for item := range itemset {  
        items = append(items, item)  
    }  
      
    var subsets []ItemSet  
    n := len(items)  
    for i := 0; i < (1 << n); i++ {  
        subset := make(ItemSet)  
        for j := 0; j < n; j++ {  
            if (i & (1 << j)) != 0 {  
                subset[items[j]] = true  
            }  
        }  
        subsets = append(subsets, subset)  
    }  
      
    return subsets  
}  
  
// itemsetToString converts an itemset to its string representation  
func itemsetToString(itemset ItemSet) string {  
    if len(itemset) == 0 {  
        return ""  
    }  
      
    items := make([]string, 0, len(itemset))  
    for item := range itemset {  
        items = append(items, item)  
    }  
    sort.Strings(items)  
      
    result := items[0]  
    for i := 1; i < len(items); i++ {  
        result += "," + items[i]  
    }  
    return result  
}  
  
// stringToItemSet converts a string representation back to an itemset  
func stringToItemSet(str string) ItemSet {  
    if str == "" {  
        return make(ItemSet)  
    }  
      
    itemset := make(ItemSet)  
    var item string  
    for _, ch := range str {  
        if ch == ',' {  
            if item != "" {  
                itemset[item] = true  
                item = ""  
            }  
        } else {  
            item += string(ch)  
        }  
    }  
    if item != "" {  
        itemset[item] = true  
    }  
    return itemset  
}  

这个实现主要有以下特点:

  1. 并发处理:
    • 支持度计算的并发处理
    • 候选项集生成的并发处理
    • 关联规则生成的并发处理
    • 所有并发操作都基于 CPU 核心数量动态调整
  2. 性能提升:
    • 使用 map 进行快速查找
    • 项集连接操作的优化
    • 高效的字符串转换方法
    • 结果去重处理

使用示例:

package main  
  
import (  
    "fmt"  
)  
  
func main() {  
    // 创建测试数据 - 购物篮数据示例  
    transactions := []apriori.Transaction{  
        {"bread", "milk", "eggs"},  
        {"bread", "butter", "milk"},  
        {"milk", "eggs", "coffee"},  
        {"bread", "milk", "butter", "eggs"},  
        {"bread", "coffee", "butter"},  
        {"milk", "butter", "coffee"},  
        {"bread", "milk", "butter", "coffee"},  
        {"bread", "milk", "eggs", "butter"},  
    }  
  
    // 设置最小支持度和最小置信度  
    minSupport := 0.3    // 30% - 商品组合至少出现在30%的交易中  
    minConfidence := 0.7 // 70% - 规则的置信度至少为70%  
  
    // 运行Apriori算法  
    result := apriori.ConcurrentApriori(transactions, minSupport, minConfidence)  
  
    // 打印频繁项集  
    fmt.Println("频繁项集:")  
    for k, itemsets := range result.FrequentItemsets {  
        fmt.Printf("\n%d项集:\n", k)  
        for _, itemset := range itemsets {  
            fmt.Printf("项集: %v, 支持度计数: %d\n", itemsetToString(itemset.Items), itemset.Support)  
        }  
    }  
  
    // 打印关联规则  
    fmt.Println("\n关联规则:")  
    for _, rule := range result.Rules {  
        fmt.Printf("%v => %v (支持度: %.2f, 置信度: %.2f)\n",  
            itemsetToString(rule.Antecedent),  
            itemsetToString(rule.Consequent),  
            rule.Support,  
            rule.Confidence)  
    }  
}  
  
// 辅助函数:将ItemSet转换为可读字符串  
func itemsetToString(items apriori.ItemSet) string {  
    result := "{"  
    first := true  
    for item := range items {  
        if !first {  
            result += ", "  
        }  
        result += item  
        first = false  
    }  
    result += "}"  
    return result  
}  

这个实现相比传统的串行实现,在处理大规模数据集时会有明显的性能提升。特别是在计算支持度这个计算密集型的步骤中,并行处理可以显著减少执行时间。

需要注意的是,性能提升的程度取决于:

  1. 数据集的大小
  2. 事务的数量
  3. CPU核心数
  4. 最小支持度阈值

对于小型数据集,由于协程创建和管理的开销,可能不会带来明显的性能提升。建议在数据集较大时使用这个并发实现。

4. 算法示例

假设有一个超市的交易数据集如下:

交易编号 购买的商品
1 牛奶, 面包
2 牛奶, 面包, 鸡蛋
3 牛奶, 鸡蛋
4 面包, 鸡蛋
5 牛奶, 面包

步骤 1:生成频繁 1-项集

  • 统计每个商品的支持度:
    • 牛奶:4/5 = 0.8
    • 面包:4/5 = 0.8
    • 鸡蛋:3/5 = 0.6
  • 假设最小支持度为 0.5,则频繁 1-项集为 {牛奶}, {面包}, {鸡蛋}。

步骤 2:生成候选 2-项集

  • 连接频繁 1-项集:
    • {牛奶, 面包}
    • {牛奶, 鸡蛋}
    • {面包, 鸡蛋}

步骤 3:剪枝

  • 检查子集是否频繁:
    • {牛奶, 面包}:子集 {牛奶}, {面包} 都是频繁的。
    • {牛奶, 鸡蛋}:子集 {牛奶}, {鸡蛋} 都是频繁的。
    • {面包, 鸡蛋}:子集 {面包}, {鸡蛋} 都是频繁的。

步骤 4:计算支持度

  • 计算候选 2-项集的支持度:
    • {牛奶, 面包}:3/5 = 0.6
    • {牛奶, 鸡蛋}:2/5 = 0.4
    • {面包, 鸡蛋}:2/5 = 0.4
  • 筛选出频繁 2-项集:{牛奶, 面包}。

步骤 5:生成关联规则

  • 基于频繁 2-项集 {牛奶, 面包},生成关联规则:
    • {牛奶} → {面包}:置信度 = 3/4 = 0.75
    • {面包} → {牛奶}:置信度 = 3/4 = 0.75

      5. 改进算法

虽然Apriori 算法的基本思想和实现步骤非常清晰,然而该算法随着项集规模的增加,候选集的数量呈指数增长,导致计算开销较大;并且算法需要多次扫描数据集以计算支持度,涉及到大数据集时效率较低。

为了克服 Apriori 算法的缺点,后续提出了一些改进算法,例如:

  • FP-Growth 算法:通过构建频繁模式树(FP-Tree)来减少数据集扫描次数。
  • Eclat 算法:使用垂直数据格式和交集操作来加速频繁项集的挖掘。

<
Previous Post
来杯红茶-Facade模式
>
Next Post
Obsidian callouts使用方法