-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcrewai_workflow.py
157 lines (130 loc) · 5 KB
/
crewai_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import pandas as pd
from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool
import os
# Load OpenAI API key (if needed for CrewAI tools like SerperDevTool)
os.environ["OPENAI_API_KEY"] = "your_openai_api_key_here"
# Initialize SerperDevTool for enrichment
serper_tool = SerperDevTool()
# Step 1: Normalize Data
def normalize_data(data):
"""
Prepare the data for processing by cleaning and standardizing columns.
"""
# Drop unnamed columns
data = data.loc[:, ~data.columns.str.contains('^Unnamed')]
# Ensure necessary columns
required_columns = ["Date", "Amount", "Description"]
missing_columns = [col for col in required_columns if col not in data.columns]
if missing_columns:
raise ValueError(f"The following required columns are missing: {', '.join(missing_columns)}")
# Convert date column to datetime for easier grouping
data["Date"] = pd.to_datetime(data["Date"])
return data
# Step 2: Detect Recurring Charges
def detect_recurring_charges(data):
"""
Detect potential recurring charges by grouping descriptions and amounts.
"""
# Group by description and amount to find recurring charges
recurring = (
data.groupby(["Description", "Amount"])
.size()
.reset_index(name="Frequency")
)
# Filter for transactions that occur more than once
recurring = recurring[recurring["Frequency"] > 1]
# Merge back to include original details
recurring_data = data.merge(
recurring, on=["Description", "Amount"], how="inner"
).sort_values(by=["Description", "Date"])
return recurring_data
def enrich_merchant_data(data):
"""
Enrich recurring transactions by inferring merchant details from descriptions.
"""
# Infer the 'Merchant' column if it is missing
if "Merchant" not in data.columns:
data["Merchant"] = data["Description"].apply(lambda desc: infer_merchant_from_description(desc))
# Add enrichment context (e.g., category)
data["Category"] = data["Merchant"].apply(
lambda merchant: "Subscription" if merchant != "Unknown Merchant" else "Unknown"
)
return data
def infer_merchant_from_description(description):
"""
Infer merchant information based on transaction description.
This function uses simple keyword matching; replace with actual enrichment logic.
"""
# Mocked logic for now; replace with tool/API calls
if "Netflix" in description:
return "Netflix"
elif "Spotify" in description:
return "Spotify"
elif "Gym" in description:
return "Local Gym"
else:
return "Unknown Merchant"
# Step 4: Full Workflow
def run_crewai_workflow(data):
"""
Full workflow to detect recurring transactions and enrich them.
"""
try:
# Step 1: Normalize data
normalized_data = normalize_data(data)
# Step 2: Detect recurring charges
recurring_data = detect_recurring_charges(normalized_data)
if recurring_data.empty:
print("No recurring transactions detected.")
return None
# Step 3: Enrich recurring transactions
enriched_data = enrich_merchant_data(recurring_data)
return enriched_data
except Exception as e:
print(f"Error detecting recurring subscriptions: {e}")
return None
# Step 5: Integration with CrewAI
# Define CrewAI agents and tasks
normalizer_agent = Agent(
role="Data Normalizer",
goal="Prepare and standardize transaction data for processing.",
tools=[],
verbose=True,
backstory="Responsible for ensuring clean and consistent data for analysis."
)
recurring_detector_agent = Agent(
role="Recurring Charge Detector",
goal="Identify potential recurring transactions indicative of subscriptions.",
tools=[],
verbose=True,
backstory="Skilled in detecting patterns in financial transactions."
)
enrichment_agent = Agent(
role="Enrichment Agent",
goal="Enrich transaction data by identifying merchants and subscription details.",
tools=[serper_tool],
verbose=True,
backstory="Uses advanced tools to enrich transaction details for better insights."
)
normalization_task = Task(
description="Normalize and prepare transaction data.",
expected_output="Cleaned and standardized data ready for analysis.",
agent=normalizer_agent
)
recurring_detection_task = Task(
description="Analyze data to detect potential recurring transactions.",
expected_output="A list of recurring transactions and their frequencies.",
agent=recurring_detector_agent
)
enrichment_task = Task(
description="Enrich recurring transactions with merchant and category details.",
expected_output="Recurring transactions enriched with merchant and subscription data.",
agent=enrichment_agent
)
# Define Crew
crew = Crew(
agents=[normalizer_agent, recurring_detector_agent, enrichment_agent],
tasks=[normalization_task, recurring_detection_task, enrichment_task],
process=Process.sequential # Sequential execution of tasks
)