Forecasting gas prices demo

Author
Published

October 3, 2022

This notebook implements a forecast for US gas prices for the next 13 weeks (roughly one quarter), broken down by region. I am taking this as an opportunity to kick the tires on three new packages I’ve been excited to try out:

I hope it also just demonstrates what I believe to be an effective way to wrangle data in Python, using Pandas method-chaining, Altair plots, black to automatically cleanup after myself

I want to be upfront and state that I am an advisor for both Linea and Nixtla. I think both companies are building cool technology for data science and data engineering, and using their packages is a good way for me to contribute as an advisor. If you have feedback about either package, please pass it along to me and I’ll make sure they get it!

%load_ext lineapy
%load_ext nb_black
from statsforecast.models import AutoARIMA

1. Download and clean up data

We are going to get data from the EIA’s website, which has weekly gas prices by region since 1993. They (sadly) make the data available in an XLS file, but fortunately Pandas can read this natively, and we can do almost all the needed cleanup in a few lines.

response = requests.get("https://www.eia.gov/petroleum/gasdiesel/xls/pswrgvwall.xls")
df = pd.read_excel(
    response.content,
    sheet_name="Data 12",
    index_col=0,
    skiprows=2,
    parse_dates=["Date"],
).rename(
    columns=lambda c: re.sub(
        "\(PADD 1[A-C]\)",
        "",
        c.replace("Weekly ", "").replace(
            " All Grades All Formulations Retail Gasoline Prices  (Dollars per Gallon)",
            "",
        ),
    ).strip()
)

Saving this dataframe in LineaPy makes it available as an artifact I can schedule to be re-created regularly.

lineapy.save(df, "weekly_gas_price_data")
LineaArtifact(name='weekly_gas_price_data', _version=0)

2. Tidy and explore the data

Our first step here is to tidy the data.

  • convert from wide to long format
  • rename columns
  • clean up missing values

After that we’ll do some basic EDA.

df_long = (
    df.reset_index()
    .melt(id_vars=["Date"], var_name="region", value_name="price")
    .rename(columns={"Date": "week"})
    .sort_values(["region", "week"])
    .assign(
        # if we're missing one value, just use the last value
        # (happens twice)
        price=lambda x: x["price"].combine_first(x.groupby("region")["price"].shift(1)),
        # we'll forecast log(price) and then transform
        log_price=lambda x: np.log(x["price"]),
        # percentage price changes are approximately the difference in log(price)
        price_change=lambda x: (
            x["log_price"] - x.groupby("region")["log_price"].shift(1)
        ),
    )
    .query("price == price")  # filter out NAs
)

lineapy.save(df_long, "weekly_gas_price_data_long")
df_long.head()
week region price log_price price_change
28231 2003-05-26 Boston, MA 1.555 0.441476 NaN
28232 2003-06-02 Boston, MA 1.547 0.436318 -0.005158
28233 2003-06-09 Boston, MA 1.534 0.427879 -0.008439
28234 2003-06-16 Boston, MA 1.549 0.437610 0.009731
28235 2003-06-23 Boston, MA 1.544 0.434376 -0.003233

Count how many non-missing values we have so we can get a sense of how long the time series are.

df_long.groupby("region")["price"].count().reset_index().pipe(alt.Chart).encode(
    x=alt.X("price", title="Cases"), y=alt.Y("region", sort=alt.SortField("price"))
).mark_bar()

As time goes on we get prices for more regions. They gradually introduced more fine-grained geographies over time. By 2003 or so all regions are available.

df_long.groupby("week")["price"].count().reset_index().pipe(alt.Chart).encode(
    x="week", y=alt.Y("price", title="Count")
).mark_line()

This plot shows the distribution of price changes. Clearly non-normal, it looks like a Laplace distribution. Interestingly the median is less than mean (which is positive).

df_long["price_change"].mean() * 52
0.043229833304911806
(
    df_long.query("price_change == price_change")
    .sample(5000)
    .pipe(alt.Chart)
    .transform_density("price_change")
    .encode(x="value:Q", y="density:Q")
    .mark_area()
)
all_regions = df_long["region"].unique().tolist()
lineapy.save(all_regions, "all_regions")
num_regions = len(all_regions)
num_regions
28

This is the smallest date for which we have data for all regions, which is useful as we want a matrix of complete cases.

complete_case_date = (
    df_long.groupby("week")["price"]
    .count()
    .reset_index()
    .query(f"price == {num_regions}")["week"]
    .min()
).strftime("%Y-%m-%d")
complete_case_date
'2003-05-26'
(
    df_long.groupby("region")["price_change"]
    .mean()
    .reset_index()
    .assign(annual_price_change=lambda x: x["price_change"] * 52)
    .pipe(alt.Chart)
    .encode(
        x=alt.X("region", sort=alt.SortField("annual_price_change")),
        y=alt.Y("annual_price_change", title="Annual Price Growth"),
    )
    .mark_bar()
)

Matrix factorizations are fun

wide = (
    df_long.query(f"week > '{complete_case_date}'")[["week", "region", "price_change"]]
    .set_index("week")
    .pivot(columns="region", values="price_change")
)
matrix = wide.values
print(matrix.shape)
u, d, v = svd(matrix)
(1009, 28)
scree_plot = (
    pd.DataFrame({"eigenvalue": d, "index": np.arange(d.shape[0])})
    .pipe(alt.Chart)
    .encode(x="index", y="eigenvalue")
    .mark_point()
)

lineapy.save(scree_plot, "scree_plot")
scree_plot
components = pd.DataFrame(
    v, columns=[f"component_{i}" for i in range(v.shape[0])], index=wide.columns
).reset_index()

components_plot = (
    components.pipe(alt.Chart)
    .encode(x="component_0", y="component_1", text="region")
    .mark_text()
    .interactive()
)

lineapy.save(components_plot, "components_plot")
components_plot

ACF plots: are price changes mean-reverting?

Here we check if changes in gas prices have any autocorrelation. If they do, it indicates there is some momentum: if prices are increasing they are likely to increase next week and we should fill up our tank as soon as possible. If there is no autocorrelation, there’s no real benefit to timing our gas purchases. If the autocorrelation is negative, we may want to wait awhile after prices increase to fill up.

Answer: looks like there’s strong positive autocorrelation lasting about 6 weeks. There does appear to be some mean reversion on a longer time-scale (1-2 quarters).

region = "U.S."
auto_correlation = (
    df_long.query(f"region == '{region}'")
    .query("price_change == price_change")["price_change"]
    .pipe(acf)
)
acf_plot = (
    pd.DataFrame({"rho": auto_correlation, "lag": np.arange(auto_correlation.shape[0])})
    .pipe(alt.Chart, title=region)
    .encode(x="lag", y="rho")
    .mark_bar()
)
lineapy.save(acf_plot, "acf_plot2")
acf_plot

3. Make some forecasts

Note that I’m just going to hard-code these parameters here, and then use LineaPy turn them into proper parameters later. This allows me to quickly iterate on a single forecast interactively and then later productionize seamlessly.

H = 13
CI = 80
width = 300
height = 250
region = "U.S."
cutoff_date = "2022-10-02"
plot_start_date = "2022-01-01"
plot_title = f"{region} (as of {cutoff_date})"
region_df = df_long.query(f"region == '{region}'")
train = region_df.query(f"week < '{cutoff_date}'")
m_aa = AutoARIMA()
m_aa.fit(train["log_price"].values)
AutoARIMA
raw_forecast = m_aa.predict(h=H, level=(CI,))
raw_forecast_exp = {key: np.exp(value) for key, value in raw_forecast.items()}
forecast = pd.DataFrame(raw_forecast_exp).assign(
    week=pd.date_range(train["week"].max(), periods=H, freq="W")
    + pd.Timedelta("7 days")
)
forecast = pd.concat(
    [
        forecast,
        train.tail(1)
        .rename(columns={"price": "mean"})
        .assign(**{f"lo-{CI}": lambda x: x["mean"], f"hi-{CI}": lambda x: x["mean"]}),
    ]
)
forecast.head()
mean lo-80 hi-80 week region log_price price_change
0 3.860496 3.785185 3.937306 2022-10-09 NaN NaN NaN
1 3.877999 3.741205 4.019795 2022-10-16 NaN NaN NaN
2 3.899743 3.705606 4.104051 2022-10-23 NaN NaN NaN
3 3.917323 3.665836 4.186063 2022-10-30 NaN NaN NaN
4 3.928728 3.622444 4.260909 2022-11-06 NaN NaN NaN
uncertainty_plot = (
    forecast.pipe(alt.Chart, height=height, width=width)
    .encode(
        x="week",
        y=alt.Y(f"lo-{CI}", title="Price"),
        y2=alt.Y2(f"hi-{CI}", title="Price"),
    )
    .mark_area(opacity=0.2)
)

history_plot = (
    region_df.query(f"week >= '{plot_start_date}'")
    .pipe(alt.Chart, title=plot_title)
    .encode(x=alt.X("week", title="Week"), y=alt.Y("price", title="Price"))
    .mark_line()
)

forecast_plot = forecast.pipe(alt.Chart).encode(x="week", y="mean").mark_line()

cutoff_plot = (
    train.tail(1).pipe(alt.Chart).encode(x="week").mark_rule(strokeDash=[10, 2])
)

full_plot = uncertainty_plot + history_plot + forecast_plot + cutoff_plot
lineapy.save(full_plot, "gas_price_forecast")
LineaArtifact(name='gas_price_forecast', _version=0)

The forecast is kind of boring actually, the recent trend has been flat so it looks unlikely prices could climb as high as they were in June or even March.

full_plot

4. Build a pipeline

Parameter Refactor

Here is some magic. I can take the last plot I made and directly turn it into a function that makes the plot again – but where I can set new options about the parameters I specified!

I find this to be a helpful pattern – you can prototype in a procedural way that’s easy to read, and then let LineaPy clean up and turn it into something re-usable later.

forecast_region = lineapy.get_function(
    ["gas_price_forecast"],
    input_parameters=[
        "region",
        "cutoff_date",
        "H",
        "width",
        "height",
        "plot_start_date",
    ],
    reuse_pre_computed_artifacts=["weekly_gas_price_data_long"],
)

Let’s got back in time and look at a forecast from when prices hit their peak in California.

result = forecast_region(
    region="California", cutoff_date="2022-06-07", H=15, width=300, height=250
)
result["gas_price_forecast"]
plots = []
for region in all_regions:
    result = forecast_region(
        region=region, cutoff_date=cutoff_date, height=200, width=200
    )
    plots.append(result["gas_price_forecast"])
chart = alt.vconcat()
for i, plot in enumerate(plots):
    if i % 4 == 0:
        row = alt.hconcat()
        chart &= row
    row |= plot
chart

Save the final artifact

One thing I can do is just save the final result here somewhere I may reference elsewhere. For instance, in another document I could get this chart and render it (which I do!).

lineapy.save(chart, "all_forecasts_plot")
LineaArtifact(name='all_forecasts_plot', _version=0)

Generate a runnable pipeline

Having kept track of everything we did to make our forecast plots, LineaPy can generate an entire Airflow pipeline that faithfully replicates what I’ve done in this notebook. None of the side-quest analyses or data checking will be included, and the packages I used (and their versions are captured. The one thing I do have to do is include a dependency graph between the different artifacts so that the DAG can reflect this.

LineaPy is useful locally, but to share my results with others and run pipelines in a different environment, it can also be hosted as in this tutorial.

lineapy.to_pipeline(
    ["gas_price_forecast", "weekly_gas_price_data", "weekly_gas_price_data_long"],
    dependencies={
        "gas_price_forecast": {"weekly_gas_price_data_long": {"weekly_gas_price_data"}}
    },
    pipeline_name="gas_price_forecast",
    output_dir="pipeline",
    framework="AIRFLOW",
    input_parameters=["region", "cutoff_date"],
)
Generated module file: pipeline/gas_price_forecast_module.py                    
Generated requirements file: pipeline/gas_price_forecast_requirements.txt       
Generated DAG file: pipeline/gas_price_forecast_dag.py                          
Generated Docker file: pipeline/gas_price_forecast_Dockerfile                   
PosixPath('pipeline')
!cat pipeline/gas_price_forecast_dag.py
import pathlib
import pickle

import gas_price_forecast_module
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago


def dag_setup():
    pickle_folder = pathlib.Path("/tmp").joinpath("gas_price_forecast")
    if not pickle_folder.exists():
        pickle_folder.mkdir()


def dag_teardown():
    pickle_files = pathlib.Path("/tmp").joinpath("gas_price_forecast").glob("*.pickle")
    for f in pickle_files:
        f.unlink()


def task_weekly_gas_price_data():

    df = gas_price_forecast_module.get_weekly_gas_price_data()

    pickle.dump(df, open("/tmp/gas_price_forecast/variable_df.pickle", "wb"))


def task_weekly_gas_price_data_long():

    df = pickle.load(open("/tmp/gas_price_forecast/variable_df.pickle", "rb"))

    df_long = gas_price_forecast_module.get_weekly_gas_price_data_long(df)

    pickle.dump(df_long, open("/tmp/gas_price_forecast/variable_df_long.pickle", "wb"))


def task_gas_price_forecast(cutoff_date, region):

    cutoff_date = str(cutoff_date)

    region = str(region)

    df_long = pickle.load(open("/tmp/gas_price_forecast/variable_df_long.pickle", "rb"))

    full_plot = gas_price_forecast_module.get_gas_price_forecast(
        cutoff_date, df_long, region
    )

    pickle.dump(
        full_plot, open("/tmp/gas_price_forecast/variable_full_plot.pickle", "wb")
    )


default_dag_args = {
    "owner": "airflow",
    "retries": 2,
    "start_date": days_ago(1),
    "params": {"region": "U.S.", "cutoff_date": "2022-10-02"},
}

with DAG(
    dag_id="gas_price_forecast_dag",
    schedule_interval="*/15 * * * *",
    max_active_runs=1,
    catchup=False,
    default_args=default_dag_args,
) as dag:

    setup = PythonOperator(
        task_id="dag_setup",
        python_callable=dag_setup,
    )

    teardown = PythonOperator(
        task_id="dag_teardown",
        python_callable=dag_teardown,
    )

    weekly_gas_price_data = PythonOperator(
        task_id="weekly_gas_price_data_task",
        python_callable=task_weekly_gas_price_data,
    )

    weekly_gas_price_data_long = PythonOperator(
        task_id="weekly_gas_price_data_long_task",
        python_callable=task_weekly_gas_price_data_long,
    )

    gas_price_forecast = PythonOperator(
        task_id="gas_price_forecast_task",
        python_callable=task_gas_price_forecast,
        op_kwargs={
            "cutoff_date": "{{ params.cutoff_date }}",
            "region": "{{ params.region }}",
        },
    )

    weekly_gas_price_data >> weekly_gas_price_data_long

    weekly_gas_price_data_long >> gas_price_forecast

    setup >> weekly_gas_price_data

    gas_price_forecast >> teardown
!cat pipeline/gas_price_forecast_module.py
import argparse
import re

import altair as alt
import numpy as np
import pandas as pd
import requests
from statsforecast.models import AutoARIMA


def get_weekly_gas_price_data():
    response = requests.get(
        "https://www.eia.gov/petroleum/gasdiesel/xls/pswrgvwall.xls"
    )
    df = pd.read_excel(
        response.content,
        sheet_name="Data 12",
        index_col=0,
        skiprows=2,
        parse_dates=["Date"],
    ).rename(
        columns=lambda c: re.sub(
            "\(PADD 1[A-C]\)",
            "",
            c.replace("Weekly ", "").replace(
                " All Grades All Formulations Retail Gasoline Prices  (Dollars per Gallon)",
                "",
            ),
        ).strip()
    )
    return df


def get_weekly_gas_price_data_long(df):
    df_long = (
        df.reset_index()
        .melt(id_vars=["Date"], var_name="region", value_name="price")
        .rename(columns={"Date": "week"})
        .sort_values(["region", "week"])
        .assign(
            # if we're missing one value, just use the last value
            # (happens twice)
            price=lambda x: x["price"].combine_first(
                x.groupby("region")["price"].shift(1)
            ),
            # we'll forecast log(price) and then transform
            log_price=lambda x: np.log(x["price"]),
            # percentage price changes are approximately the difference in log(price)
            price_change=lambda x: (
                x["log_price"] - x.groupby("region")["log_price"].shift(1)
            ),
        )
        .query("price == price")  # filter out NAs
    )
    return df_long


def get_gas_price_forecast(cutoff_date, df_long, region):
    H = 13
    CI = 80
    width = 300
    height = 250
    plot_start_date = "2022-01-01"
    plot_title = f"{region} (as of {cutoff_date})"
    region_df = df_long.query(f"region == '{region}'")
    train = region_df.query(f"week < '{cutoff_date}'")
    m_aa = AutoARIMA()
    m_aa.fit(train["log_price"].values)
    raw_forecast = m_aa.predict(h=H, level=(CI,))
    raw_forecast_exp = {key: np.exp(value) for key, value in raw_forecast.items()}
    forecast = pd.DataFrame(raw_forecast_exp).assign(
        week=pd.date_range(train["week"].max(), periods=H, freq="W")
        + pd.Timedelta("7 days")
    )
    forecast = pd.concat(
        [
            forecast,
            train.tail(1)
            .rename(columns={"price": "mean"})
            .assign(
                **{f"lo-{CI}": lambda x: x["mean"], f"hi-{CI}": lambda x: x["mean"]}
            ),
        ]
    )
    uncertainty_plot = (
        forecast.pipe(alt.Chart, height=height, width=width)
        .encode(
            x="week",
            y=alt.Y(f"lo-{CI}", title="Price"),
            y2=alt.Y2(f"hi-{CI}", title="Price"),
        )
        .mark_area(opacity=0.2)
    )
    history_plot = (
        region_df.query(f"week >= '{plot_start_date}'")
        .pipe(alt.Chart, title=plot_title)
        .encode(x=alt.X("week", title="Week"), y=alt.Y("price", title="Price"))
        .mark_line()
    )
    forecast_plot = forecast.pipe(alt.Chart).encode(x="week", y="mean").mark_line()
    cutoff_plot = (
        train.tail(1).pipe(alt.Chart).encode(x="week").mark_rule(strokeDash=[10, 2])
    )
    full_plot = uncertainty_plot + history_plot + forecast_plot + cutoff_plot
    return full_plot


def run_session_including_weekly_gas_price_data(
    region="U.S.",
    cutoff_date="2022-10-02",
):
    # Given multiple artifacts, we need to save each right after
    # its calculation to protect from any irrelevant downstream
    # mutations (e.g., inside other artifact calculations)
    import copy

    artifacts = dict()
    df = get_weekly_gas_price_data()
    artifacts["weekly_gas_price_data"] = copy.deepcopy(df)
    df_long = get_weekly_gas_price_data_long(df)
    artifacts["weekly_gas_price_data_long"] = copy.deepcopy(df_long)
    full_plot = get_gas_price_forecast(cutoff_date, df_long, region)
    artifacts["gas_price_forecast"] = copy.deepcopy(full_plot)
    return artifacts


def run_all_sessions(
    region="U.S.",
    cutoff_date="2022-10-02",
):
    artifacts = dict()
    artifacts.update(run_session_including_weekly_gas_price_data(region, cutoff_date))
    return artifacts


if __name__ == "__main__":
    # Edit this section to customize the behavior of artifacts
    parser = argparse.ArgumentParser()
    parser.add_argument("--region", type=str, default="U.S.")
    parser.add_argument("--cutoff_date", type=str, default="2022-10-02")
    args = parser.parse_args()
    artifacts = run_all_sessions(
        region=args.region,
        cutoff_date=args.cutoff_date,
    )
    print(artifacts)
!cat pipeline/gas_price_forecast_requirements.txt
statsforecast==1.0.0
requests==2.28.0
re==2.2.1
altair==4.2.0
numpy==1.21.6
pandas==1.4.2
scipy==1.7.3
statsmodels==0.13.2