Pular para conteúdo

Full dataset preparation

drop_duplicate_columns_for_merge(summary, rfm)

Removes duplicate columns from DataFrames before merging.

Parameters:

Name Type Description Default
summary DataFrame

Summary DataFrame.

required
rfm DataFrame

RFM DataFrame.

required

Returns:

Type Description
tuple[DataFrame, DataFrame]

tuple[pd.DataFrame, pd.DataFrame]: - Summary DataFrame without duplicates. - RFM DataFrame without duplicates.

Source code in api/utils/full_dataset_preparation.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def drop_duplicate_columns_for_merge(
    summary: pd.DataFrame, rfm: pd.DataFrame
) -> tuple[pd.DataFrame, pd.DataFrame]:
    """
    Removes duplicate columns from DataFrames before merging.

    Args:
        summary (pd.DataFrame): Summary DataFrame.
        rfm (pd.DataFrame): RFM DataFrame.

    Returns:
        tuple[pd.DataFrame, pd.DataFrame]: 
            - Summary DataFrame without duplicates.
            - RFM DataFrame without duplicates.
    """

    summary = summary.drop(columns=['frequency', 'recency', 'monetary_value'])
    summary.reset_index(inplace=True)

    rfm = rfm.drop(columns=['product', 'revenue', 'Months_Since_Start', 'office_location'])

    return summary, rfm

expand_rfm_features(rfm)

Expands RFM metrics with additional calculations and segmentation.

Parameters:

Name Type Description Default
rfm DataFrame

RFM metrics DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Updated DataFrame with new features and segmentation.

Source code in api/utils/full_dataset_preparation.py
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def expand_rfm_features(rfm: pd.DataFrame) -> pd.DataFrame:
    """
    Expands RFM metrics with additional calculations and segmentation.

    Args:
        rfm (pd.DataFrame): RFM metrics DataFrame.

    Returns:
        pd.DataFrame: Updated DataFrame with new features and segmentation.
    """

    custom_segment_map = {
        '111': 'Dormant',
        r'11[2-3]': 'Inactive Low Spenders',
        r'12[1-3]': 'Rare Low Spenders',
        r'13[1-3]': 'Occasional Low Spenders',
        '411': 'New Enthusiasts',
        r'41[2-3]': 'New Potential Customers',
        '412': 'New Interested Parties',
        r'3[1-2]1': 'Disinterested Customers',
        r'31[2-3]': 'Rarely Interested Customers',
        '322': 'Stable Customers',
        r'32[3-4]': 'Consistent Buyers',
        r'42[2-3]': 'Moderately Engaged Customers',
        r'43[1-3]': 'Growing Engagement',
        '421': 'Beginning Customers',
        '432': 'Future Big Customers',
        r'44[2-3]': 'Loyal Customers',
        '444': 'Champions',
        r'4[2-3]4': 'Balanced High Value Customers',
        r'[3-4][3-4]1': 'High Value, Rare Buyers',
        r'[2-4][2-3][2-3]': 'Balanced Customers',
        # r'.*': 'Other'
    }

    rfm['RFM_Custom_Segment'] = rfm['RFM_Score'].replace(custom_segment_map, regex = True)

    discount_rate = 0.01 # Monthy inflation (simulation)
    rfm['Months_Since_Start'] = ((rfm['last_purchase'] - rfm['first_purchase'].min()).dt.days / 30).astype(int)
    rfm['Actual_CLTV'] = rfm['Monetary'] / ((1 + discount_rate) ** rfm['Months_Since_Start'])

    rfm['RF_Ratio'] = rfm['Recency'] / (rfm['Frequency'] + 1)
    rfm['ATV'] = rfm['Monetary'] / rfm['Frequency']

    scaler = MinMaxScaler()
    rfm_scaled = scaler.fit_transform(rfm[['Recency', 'Frequency', 'Monetary']])

    rfm['engagement_score'] = rfm_scaled[:, 1] * 0.4 + rfm_scaled[:, 2] * 0.4 - rfm_scaled[:, 0] * 0.2

    return rfm

fit_predict_bg_nbd_model(df, today_date=datetime(2018, 1, 1))

Fits and makes predictions using the Beta-Geometric/NBD model.

Parameters:

Name Type Description Default
df DataFrame

DataFrame with transaction data.

required
today_date date

Reference date for calculations. Default is 2018-01-01.

datetime(2018, 1, 1)

Returns:

Type Description
tuple[DataFrame, BetaGeoFitter]

tuple[pd.DataFrame, BetaGeoFitter]: - DataFrame with predictions. - Fitted BetaGeoFitter instance.

Source code in api/utils/full_dataset_preparation.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
def fit_predict_bg_nbd_model(
    df: pd.DataFrame, today_date: date = datetime(2018, 1, 1)
) -> tuple[pd.DataFrame, BetaGeoFitter]:
    """
    Fits and makes predictions using the Beta-Geometric/NBD model.

    Args:
        df (pd.DataFrame): DataFrame with transaction data.
        today_date (date): Reference date for calculations. Default is 2018-01-01.

    Returns:
        tuple[pd.DataFrame, BetaGeoFitter]: 
            - DataFrame with predictions.
            - Fitted BetaGeoFitter instance.
    """

    summary = summary_data_from_transaction_data(
        df,
        customer_id_col='account',
        datetime_col='close_date',
        monetary_value_col='close_value',
        observation_period_end=today_date,
    )

    bgf = BetaGeoFitter(penalizer_coef=0.005)
    bgf.fit(summary['frequency'], summary['recency'], summary['T'])

    summary['prob_alive'] = bgf.conditional_probability_alive(
    summary['frequency'], summary['recency'], summary['T']
    )

    summary['expected_purchases_day'] = bgf.conditional_expected_number_of_purchases_up_to_time(
        1, summary['frequency'], summary['recency'], summary['T']
    )

    summary['expected_purchases_week'] = bgf.conditional_expected_number_of_purchases_up_to_time(
        7, summary['frequency'], summary['recency'], summary['T']
    )

    summary['expected_purchases_monthly'] = bgf.conditional_expected_number_of_purchases_up_to_time(
        30, summary['frequency'], summary['recency'], summary['T']
    )

    summary['expected_purchases_bimonthly'] = bgf.conditional_expected_number_of_purchases_up_to_time(
        61, summary['frequency'], summary['recency'], summary['T']
    )

    summary['expected_purchases_trimester'] = bgf.conditional_expected_number_of_purchases_up_to_time(
        92, summary['frequency'], summary['recency'], summary['T']
    )

    summary['expected_purchases_half_year'] = bgf.conditional_expected_number_of_purchases_up_to_time(
        182, summary['frequency'], summary['recency'], summary['T']
    )

    summary['expected_purchases_year'] = bgf.conditional_expected_number_of_purchases_up_to_time(
        365, summary['frequency'], summary['recency'], summary['T']
    )

    export_beta_geo_fitter(bgf)

    return summary, bgf

fit_predict_gamma_gamma_model(summary)

Fits and makes predictions using the Gamma-Gamma model.

Parameters:

Name Type Description Default
summary DataFrame

Summary DataFrame.

required

Returns:

Type Description
tuple[DataFrame, GammaGammaFitter]

tuple[pd.DataFrame, GammaGammaFitter]: - Updated DataFrame with predictions. - Fitted GammaGammaFitter instance.

Source code in api/utils/full_dataset_preparation.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def fit_predict_gamma_gamma_model(summary: pd.DataFrame) -> tuple[pd.DataFrame, GammaGammaFitter]:
    """
    Fits and makes predictions using the Gamma-Gamma model.

    Args:
        summary (pd.DataFrame): Summary DataFrame.

    Returns:
        tuple[pd.DataFrame, GammaGammaFitter]: 
            - Updated DataFrame with predictions.
            - Fitted GammaGammaFitter instance.
    """

    ggf = GammaGammaFitter(penalizer_coef=0.05)
    ggf.fit(summary['frequency'], summary['monetary_value'])

    summary['expected_average_profit'] = ggf.conditional_expected_average_profit(
        summary['frequency'], summary['monetary_value']
    )

    export_gamma_gamma_fitter(ggf)

    return summary, ggf

full_dataset_preparation(session, deal_stage='Won', today_date=datetime(2018, 1, 1))

Prepares a consolidated dataset for analysis.

Parameters:

Name Type Description Default
session Session

Database session for loading data.

required
deal_stage str

The deal stage to filter by. Default is 'Won'.

'Won'
today_date date

Reference date for temporal calculations. Default is 2018-01-01.

datetime(2018, 1, 1)

Returns:

Type Description
tuple[DataFrame, DataFrame, DataFrame]

tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: - Summary data for merging. - RFM data for merging. - Consolidated dataset.

Source code in api/utils/full_dataset_preparation.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def full_dataset_preparation(
    session: Session, deal_stage: str = 'Won', today_date: date = datetime(2018, 1, 1)
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Prepares a consolidated dataset for analysis.

    Args:
        session (Session): Database session for loading data.
        deal_stage (str): The deal stage to filter by. Default is 'Won'.
        today_date (date): Reference date for temporal calculations. Default is 2018-01-01.

    Returns:
        tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: 
            - Summary data for merging.
            - RFM data for merging.
            - Consolidated dataset.
    """

    accounts_df = load_accounts_data(session)
    products_df = load_products_data(session)
    sales_pipeline_df = load_sales_pipeline_data(session)
    sales_teams_df = load_sales_teams_data(session)

    dataframes = [accounts_df, products_df, sales_pipeline_df, sales_teams_df]

    filtered_dataframes = []
    for i, raw_df in enumerate(dataframes):
        if 'id' in raw_df.columns:
            raw_df = filtered_dataframes.append(raw_df.drop(columns=['id']))
        else:
            filtered_dataframes.append(raw_df)

    accounts_df, products_df, sales_pipeline_df, sales_teams_df = filtered_dataframes

    sales_pipeline_df.loc[sales_pipeline_df['product'] == 'GTXPro', 'product'] = 'GTX Pro'

    df = (pd.merge(
            pd.merge(
                pd.merge(
                    sales_pipeline_df, accounts_df, on='account', how='inner'
                    ),
                products_df, on='product', how='inner'
                ),
            sales_teams_df, on='sales_agent', how='inner'
            )
        )

    df = make_preprocessing(df)

    if deal_stage == 'Won':
        df = make_won_pre_feature_engineering(df)
    else:
        raise NotImplementedError('Only "Won" deal stage analysis are implemented by now.')

    df = make_filter_by_deal_stage(df, deal_stage)
    rfm = make_rfm_enrichment(df, today_date)
    rfm = expand_rfm_features(rfm)

    summary, bgf = fit_predict_bg_nbd_model(df, today_date)
    summary, ggf = fit_predict_gamma_gamma_model(summary)
    summary = make_cltv_predictions(summary, bgf, ggf)

    summary_to_merge, rfm_to_merge = drop_duplicate_columns_for_merge(summary, rfm)

    return summary_to_merge, rfm_to_merge, df

load_accounts_data(session)

Loads account data from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing account data.

Source code in api/utils/full_dataset_preparation.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
def load_accounts_data(session: Session) -> pd.DataFrame:
    """
    Loads account data from the database.

    Args:
        session (Session): Database session.

    Returns:
        pd.DataFrame: DataFrame containing account data.
    """

    query = select(AccountsSourceModel)
    results = session.execute(query).all()

    data = [
        {column: getattr(row, column) for column in row.__table__.columns.keys()}
        for (row,) in results
    ]

    df = pd.DataFrame(data)
    return df

load_products_data(session)

Loads product data from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing product data.

Source code in api/utils/full_dataset_preparation.py
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
def load_products_data(session: Session) -> pd.DataFrame:
    """
    Loads product data from the database.

    Args:
        session (Session): Database session.

    Returns:
        pd.DataFrame: DataFrame containing product data.
    """

    query = select(ProductsSourceModel)
    results = session.execute(query).all()

    data = [
        {column: getattr(row, column) for column in row.__table__.columns.keys()}
        for (row,) in results
    ]

    df = pd.DataFrame(data)
    return df

load_sales_pipeline_data(session)

Loads sales pipeline data from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing sales pipeline data.

Source code in api/utils/full_dataset_preparation.py
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
def load_sales_pipeline_data(session: Session) -> pd.DataFrame:
    """
    Loads sales pipeline data from the database.

    Args:
        session (Session): Database session.

    Returns:
        pd.DataFrame: DataFrame containing sales pipeline data.
    """    

    query = select(SalesPipelineSourceModel)
    results = session.execute(query).all()

    data = [
        {column: getattr(row, column) for column in row.__table__.columns.keys()}
        for (row,) in results
    ]

    df = pd.DataFrame(data)
    return df

load_sales_teams_data(session)

Loads sales teams data from the database.

Parameters:

Name Type Description Default
session Session

Database session.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame containing sales teams data.

Source code in api/utils/full_dataset_preparation.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def load_sales_teams_data(session: Session) -> pd.DataFrame:
    """
    Loads sales teams data from the database.

    Args:
        session (Session): Database session.

    Returns:
        pd.DataFrame: DataFrame containing sales teams data.
    """

    query = select(SalesTeamsSourceModel)
    results = session.execute(query).all()

    data = [
        {column: getattr(row, column) for column in row.__table__.columns.keys()}
        for (row,) in results
    ]

    df = pd.DataFrame(data)
    return df

make_cltv_predictions(summary, bgf, ggf)

Calculates and adds CLTV predictions to the DataFrame.

Parameters:

Name Type Description Default
summary DataFrame

Summary DataFrame.

required
bgf BetaGeoFitter

Beta-Geometric/NBD model.

required
ggf GammaGammaFitter

Gamma-Gamma model.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with added CLTV predictions.

Source code in api/utils/full_dataset_preparation.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def make_cltv_predictions(
    summary: pd.DataFrame, bgf: BetaGeoFitter, ggf: GammaGammaFitter
) -> pd.DataFrame:
    """
    Calculates and adds CLTV predictions to the DataFrame.

    Args:
        summary (pd.DataFrame): Summary DataFrame.
        bgf (BetaGeoFitter): Beta-Geometric/NBD model.
        ggf (GammaGammaFitter): Gamma-Gamma model.

    Returns:
        pd.DataFrame: DataFrame with added CLTV predictions.
    """

    summary['Predicted_Year_CLTV'] = ggf.customer_lifetime_value(
        bgf,
        summary['frequency'],
        summary['recency'],
        summary['T'],
        summary['monetary_value'],
        time=12,  # Monthly
        discount_rate=0.01,  # Monthly discount
    )

    summary['Predicted_Year_CLTV'].sort_values(ascending = False)
    summary['Predicted_CLTV_Segment'] = pd.qcut(summary['Predicted_Year_CLTV'], q=3, labels=['Low', 'Medium', 'High'])

    return summary

make_filter_by_deal_stage(df, deal_stage)

Filters the DataFrame by deal stage.

Parameters:

Name Type Description Default
df DataFrame

Consolidated DataFrame.

required
deal_stage str

Deal stage to filter by.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame filtered by deal stage.

Source code in api/utils/full_dataset_preparation.py
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def make_filter_by_deal_stage(df: pd.DataFrame, deal_stage: str) -> pd.DataFrame:
    """
    Filters the DataFrame by deal stage.

    Args:
        df (pd.DataFrame): Consolidated DataFrame.
        deal_stage (str): Deal stage to filter by.

    Returns:
        pd.DataFrame: DataFrame filtered by deal stage.
    """

    deal_stage_df = df[df['deal_stage'] == deal_stage]
    return deal_stage_df

make_preprocessing(df)

Performs preprocessing on the consolidated DataFrame.

Parameters:

Name Type Description Default
df DataFrame

Consolidated DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: Preprocessed DataFrame.

Source code in api/utils/full_dataset_preparation.py
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def make_preprocessing(df: pd.DataFrame) -> pd.DataFrame:
    """
    Performs preprocessing on the consolidated DataFrame.

    Args:
        df (pd.DataFrame): Consolidated DataFrame.

    Returns:
        pd.DataFrame: Preprocessed DataFrame.
    """

    df['engage_date'] = pd.to_datetime(df['engage_date'])
    df['close_date'] = pd.to_datetime(df['close_date'])
    df['close_value'] = df['close_value'].astype(float)
    df['revenue'] = df['revenue'].astype(float)
    df['employees'] = df['employees'].astype(int)
    df['sales_price'] = df['sales_price'].astype(float)

    return df

make_rfm_enrichment(df, today_date)

Enriches the DataFrame with RFM metrics.

Parameters:

Name Type Description Default
df DataFrame

Consolidated DataFrame.

required
today_date date

Reference date for calculations.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with added RFM metrics.

Source code in api/utils/full_dataset_preparation.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def make_rfm_enrichment(df: pd.DataFrame, today_date: date) -> pd.DataFrame:
    """
    Enriches the DataFrame with RFM metrics.

    Args:
        df (pd.DataFrame): Consolidated DataFrame.
        today_date (date): Reference date for calculations.

    Returns:
        pd.DataFrame: DataFrame with added RFM metrics.
    """

    rfm = df.groupby('account').agg({
        'close_date': ['min', 'max', lambda x: (today_date - x.max()).days],
        'account': 'count',
        'close_value': 'sum',
        'office_location': 'first',
        'product': 'first',
        'revenue': 'first',
    })

    accounts = rfm.index
    rfm = rfm.reset_index(drop=True)
    rfm.columns = ['_'.join(col).strip() if isinstance(col, tuple) else col for col in rfm.columns]

    rfm.rename(columns={
        'close_date_min': 'first_purchase',
        'close_date_max': 'last_purchase',
        'close_date_<lambda_0>': 'Recency',
        'account_count': 'Frequency',
        'close_value_sum': 'Monetary',
        'office_location_first': 'office_location',
        'product_first': 'product',
        'revenue_first': 'revenue'
    }, inplace=True)

    rfm['account'] = accounts

    rfm['R_Score'] = pd.cut(rfm['Recency'], bins=4, labels=[4, 3, 2, 1])
    rfm['F_Score'] = pd.cut(rfm['Frequency'], bins=4, labels=[1, 2, 3, 4])
    rfm['M_Score'] = pd.cut(rfm['Monetary'], bins=4, labels=[1, 2, 3, 4])
    rfm['RFM_Score'] = rfm['R_Score'].astype(str) + rfm['F_Score'].astype(str) + rfm['M_Score'].astype(str)

    return rfm

make_won_pre_feature_engineering(df)

Performs feature engineering for 'Won' deal stage data.

Parameters:

Name Type Description Default
df DataFrame

Consolidated DataFrame.

required

Returns:

Type Description
DataFrame

pd.DataFrame: DataFrame with additional features for 'Won' deal stage.

Source code in api/utils/full_dataset_preparation.py
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def make_won_pre_feature_engineering(df: pd.DataFrame) -> pd.DataFrame:
    """
    Performs feature engineering for 'Won' deal stage data.

    Args:
        df (pd.DataFrame): Consolidated DataFrame.

    Returns:
        pd.DataFrame: DataFrame with additional features for 'Won' deal stage.
    """

    won_lost_deal_stage_df = df[(df['deal_stage'] == 'Won') | (df['deal_stage'] == 'Lost')]

    total_opportunities = won_lost_deal_stage_df.groupby('sales_agent')['opportunity_id'].count()
    won_opportunities = won_lost_deal_stage_df[won_lost_deal_stage_df['deal_stage'] == 'Won'].groupby('sales_agent')['opportunity_id'].count()

    close_rate = (won_opportunities / total_opportunities).fillna(0) * 100

    df['sales_cycle_duration'] = (df['close_date'] - df['engage_date']).dt.days
    df['agent_close_rate'] = df['sales_agent'].map(close_rate)
    df['opportunities_per_account'] = df.groupby('account')['opportunity_id'].transform('count')
    df['opportunities_per_sales_agent'] = df.groupby('sales_agent')['opportunity_id'].transform('count')

    return df