%load_ext lineapy
%load_ext nb_black
Forecasting gas prices demo
This notebook implements a forecast for US gas prices for the next 13 weeks (roughly one quarter), broken down by region. I am taking this as an opportunity to kick the tires on three new packages I’ve been excited to try out:
- statsforecast’s AutoARIMA implementation (created by Nixtla). Note that gas prices are ill-suited to Prophet’s model which focuses on modeling seasonality that’s not really present here.
- LineaPy to save data and plots, and generate a pipeline we can schedule to run regularly. This is super useful because I’m going to do a lot of extra exploratory work here that isn’t really relevant to the final automation step.
- Quarto the new hotness for rendering documents from Jupyter notebooks. If you’re reading this and think “hey this document looks nice!” it’s 100% because of Quarto.
I hope it also just demonstrates what I believe to be an effective way to wrangle data in Python, using Pandas method-chaining, Altair plots, black to automatically cleanup after myself
I want to be upfront and state that I am an advisor for both Linea and Nixtla. I think both companies are building cool technology for data science and data engineering, and using their packages is a good way for me to contribute as an advisor. If you have feedback about either package, please pass it along to me and I’ll make sure they get it!
from statsforecast.models import AutoARIMA
1. Download and clean up data
We are going to get data from the EIA’s website, which has weekly gas prices by region since 1993. They (sadly) make the data available in an XLS file, but fortunately Pandas can read this natively, and we can do almost all the needed cleanup in a few lines.
= requests.get("https://www.eia.gov/petroleum/gasdiesel/xls/pswrgvwall.xls")
response = pd.read_excel(
df
response.content,="Data 12",
sheet_name=0,
index_col=2,
skiprows=["Date"],
parse_dates
).rename(=lambda c: re.sub(
columns"\(PADD 1[A-C]\)",
"",
"Weekly ", "").replace(
c.replace(" All Grades All Formulations Retail Gasoline Prices (Dollars per Gallon)",
"",
),
).strip() )
Saving this dataframe in LineaPy makes it available as an artifact I can schedule to be re-created regularly.
"weekly_gas_price_data") lineapy.save(df,
LineaArtifact(name='weekly_gas_price_data', _version=0)
2. Tidy and explore the data
Our first step here is to tidy the data.
- convert from wide to long format
- rename columns
- clean up missing values
After that we’ll do some basic EDA.
= (
df_long
df.reset_index()=["Date"], var_name="region", value_name="price")
.melt(id_vars={"Date": "week"})
.rename(columns"region", "week"])
.sort_values([
.assign(# if we're missing one value, just use the last value
# (happens twice)
=lambda x: x["price"].combine_first(x.groupby("region")["price"].shift(1)),
price# we'll forecast log(price) and then transform
=lambda x: np.log(x["price"]),
log_price# percentage price changes are approximately the difference in log(price)
=lambda x: (
price_change"log_price"] - x.groupby("region")["log_price"].shift(1)
x[
),
)"price == price") # filter out NAs
.query(
)
"weekly_gas_price_data_long")
lineapy.save(df_long, df_long.head()
week | region | price | log_price | price_change | |
---|---|---|---|---|---|
28231 | 2003-05-26 | Boston, MA | 1.555 | 0.441476 | NaN |
28232 | 2003-06-02 | Boston, MA | 1.547 | 0.436318 | -0.005158 |
28233 | 2003-06-09 | Boston, MA | 1.534 | 0.427879 | -0.008439 |
28234 | 2003-06-16 | Boston, MA | 1.549 | 0.437610 | 0.009731 |
28235 | 2003-06-23 | Boston, MA | 1.544 | 0.434376 | -0.003233 |
Count how many non-missing values we have so we can get a sense of how long the time series are.
"region")["price"].count().reset_index().pipe(alt.Chart).encode(
df_long.groupby(=alt.X("price", title="Cases"), y=alt.Y("region", sort=alt.SortField("price"))
x ).mark_bar()
As time goes on we get prices for more regions. They gradually introduced more fine-grained geographies over time. By 2003 or so all regions are available.
"week")["price"].count().reset_index().pipe(alt.Chart).encode(
df_long.groupby(="week", y=alt.Y("price", title="Count")
x ).mark_line()
This plot shows the distribution of price changes. Clearly non-normal, it looks like a Laplace distribution. Interestingly the median is less than mean (which is positive).
"price_change"].mean() * 52 df_long[
0.043229833304911806
("price_change == price_change")
df_long.query(5000)
.sample(
.pipe(alt.Chart)"price_change")
.transform_density(="value:Q", y="density:Q")
.encode(x
.mark_area() )
= df_long["region"].unique().tolist()
all_regions "all_regions")
lineapy.save(all_regions, = len(all_regions)
num_regions num_regions
28
This is the smallest date for which we have data for all regions, which is useful as we want a matrix of complete cases.
= (
complete_case_date "week")["price"]
df_long.groupby(
.count()
.reset_index()f"price == {num_regions}")["week"]
.query(min()
."%Y-%m-%d")
).strftime( complete_case_date
'2003-05-26'
("region")["price_change"]
df_long.groupby(
.mean()
.reset_index()=lambda x: x["price_change"] * 52)
.assign(annual_price_change
.pipe(alt.Chart)
.encode(=alt.X("region", sort=alt.SortField("annual_price_change")),
x=alt.Y("annual_price_change", title="Annual Price Growth"),
y
)
.mark_bar() )
Matrix factorizations are fun
= (
wide f"week > '{complete_case_date}'")[["week", "region", "price_change"]]
df_long.query("week")
.set_index(="region", values="price_change")
.pivot(columns
)= wide.values
matrix print(matrix.shape)
= svd(matrix) u, d, v
(1009, 28)
= (
scree_plot "eigenvalue": d, "index": np.arange(d.shape[0])})
pd.DataFrame({
.pipe(alt.Chart)="index", y="eigenvalue")
.encode(x
.mark_point()
)
"scree_plot")
lineapy.save(scree_plot, scree_plot
= pd.DataFrame(
components =[f"component_{i}" for i in range(v.shape[0])], index=wide.columns
v, columns
).reset_index()
= (
components_plot
components.pipe(alt.Chart)="component_0", y="component_1", text="region")
.encode(x
.mark_text()
.interactive()
)
"components_plot")
lineapy.save(components_plot, components_plot
ACF plots: are price changes mean-reverting?
Here we check if changes in gas prices have any autocorrelation. If they do, it indicates there is some momentum: if prices are increasing they are likely to increase next week and we should fill up our tank as soon as possible. If there is no autocorrelation, there’s no real benefit to timing our gas purchases. If the autocorrelation is negative, we may want to wait awhile after prices increase to fill up.
Answer: looks like there’s strong positive autocorrelation lasting about 6 weeks. There does appear to be some mean reversion on a longer time-scale (1-2 quarters).
= "U.S."
region = (
auto_correlation f"region == '{region}'")
df_long.query("price_change == price_change")["price_change"]
.query(
.pipe(acf)
)= (
acf_plot "rho": auto_correlation, "lag": np.arange(auto_correlation.shape[0])})
pd.DataFrame({=region)
.pipe(alt.Chart, title="lag", y="rho")
.encode(x
.mark_bar()
)"acf_plot2")
lineapy.save(acf_plot, acf_plot
3. Make some forecasts
Note that I’m just going to hard-code these parameters here, and then use LineaPy turn them into proper parameters later. This allows me to quickly iterate on a single forecast interactively and then later productionize seamlessly.
= 13
H = 80
CI = 300
width = 250
height = "U.S."
region = "2022-10-02"
cutoff_date = "2022-01-01"
plot_start_date = f"{region} (as of {cutoff_date})" plot_title
= df_long.query(f"region == '{region}'")
region_df = region_df.query(f"week < '{cutoff_date}'")
train = AutoARIMA()
m_aa "log_price"].values) m_aa.fit(train[
AutoARIMA
= m_aa.predict(h=H, level=(CI,))
raw_forecast = {key: np.exp(value) for key, value in raw_forecast.items()}
raw_forecast_exp = pd.DataFrame(raw_forecast_exp).assign(
forecast =pd.date_range(train["week"].max(), periods=H, freq="W")
week+ pd.Timedelta("7 days")
)= pd.concat(
forecast
[
forecast,1)
train.tail(={"price": "mean"})
.rename(columns**{f"lo-{CI}": lambda x: x["mean"], f"hi-{CI}": lambda x: x["mean"]}),
.assign(
]
) forecast.head()
mean | lo-80 | hi-80 | week | region | log_price | price_change | |
---|---|---|---|---|---|---|---|
0 | 3.860496 | 3.785185 | 3.937306 | 2022-10-09 | NaN | NaN | NaN |
1 | 3.877999 | 3.741205 | 4.019795 | 2022-10-16 | NaN | NaN | NaN |
2 | 3.899743 | 3.705606 | 4.104051 | 2022-10-23 | NaN | NaN | NaN |
3 | 3.917323 | 3.665836 | 4.186063 | 2022-10-30 | NaN | NaN | NaN |
4 | 3.928728 | 3.622444 | 4.260909 | 2022-11-06 | NaN | NaN | NaN |
= (
uncertainty_plot =height, width=width)
forecast.pipe(alt.Chart, height
.encode(="week",
x=alt.Y(f"lo-{CI}", title="Price"),
y=alt.Y2(f"hi-{CI}", title="Price"),
y2
)=0.2)
.mark_area(opacity
)
= (
history_plot f"week >= '{plot_start_date}'")
region_df.query(=plot_title)
.pipe(alt.Chart, title=alt.X("week", title="Week"), y=alt.Y("price", title="Price"))
.encode(x
.mark_line()
)
= forecast.pipe(alt.Chart).encode(x="week", y="mean").mark_line()
forecast_plot
= (
cutoff_plot 1).pipe(alt.Chart).encode(x="week").mark_rule(strokeDash=[10, 2])
train.tail(
)
= uncertainty_plot + history_plot + forecast_plot + cutoff_plot
full_plot "gas_price_forecast") lineapy.save(full_plot,
LineaArtifact(name='gas_price_forecast', _version=0)
The forecast is kind of boring actually, the recent trend has been flat so it looks unlikely prices could climb as high as they were in June or even March.
full_plot
4. Build a pipeline
Parameter Refactor
Here is some magic. I can take the last plot I made and directly turn it into a function that makes the plot again – but where I can set new options about the parameters I specified!
I find this to be a helpful pattern – you can prototype in a procedural way that’s easy to read, and then let LineaPy clean up and turn it into something re-usable later.
= lineapy.get_function(
forecast_region "gas_price_forecast"],
[=[
input_parameters"region",
"cutoff_date",
"H",
"width",
"height",
"plot_start_date",
],=["weekly_gas_price_data_long"],
reuse_pre_computed_artifacts )
Let’s got back in time and look at a forecast from when prices hit their peak in California.
= forecast_region(
result ="California", cutoff_date="2022-06-07", H=15, width=300, height=250
region
)"gas_price_forecast"] result[
= []
plots for region in all_regions:
= forecast_region(
result =region, cutoff_date=cutoff_date, height=200, width=200
region
)"gas_price_forecast"]) plots.append(result[
= alt.vconcat()
chart for i, plot in enumerate(plots):
if i % 4 == 0:
= alt.hconcat()
row &= row
chart |= plot
row chart
Save the final artifact
One thing I can do is just save the final result here somewhere I may reference elsewhere. For instance, in another document I could get this chart and render it (which I do!).
"all_forecasts_plot") lineapy.save(chart,
LineaArtifact(name='all_forecasts_plot', _version=0)
Generate a runnable pipeline
Having kept track of everything we did to make our forecast plots, LineaPy can generate an entire Airflow pipeline that faithfully replicates what I’ve done in this notebook. None of the side-quest analyses or data checking will be included, and the packages I used (and their versions are captured. The one thing I do have to do is include a dependency graph between the different artifacts so that the DAG can reflect this.
LineaPy is useful locally, but to share my results with others and run pipelines in a different environment, it can also be hosted as in this tutorial.
lineapy.to_pipeline("gas_price_forecast", "weekly_gas_price_data", "weekly_gas_price_data_long"],
[={
dependencies"gas_price_forecast": {"weekly_gas_price_data_long": {"weekly_gas_price_data"}}
},="gas_price_forecast",
pipeline_name="pipeline",
output_dir="AIRFLOW",
framework=["region", "cutoff_date"],
input_parameters )
Generated module file: pipeline/gas_price_forecast_module.py
Generated requirements file: pipeline/gas_price_forecast_requirements.txt
Generated DAG file: pipeline/gas_price_forecast_dag.py
Generated Docker file: pipeline/gas_price_forecast_Dockerfile
PosixPath('pipeline')
!cat pipeline/gas_price_forecast_dag.py
import pathlib
import pickle
import gas_price_forecast_module
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
def dag_setup():
pickle_folder = pathlib.Path("/tmp").joinpath("gas_price_forecast")
if not pickle_folder.exists():
pickle_folder.mkdir()
def dag_teardown():
pickle_files = pathlib.Path("/tmp").joinpath("gas_price_forecast").glob("*.pickle")
for f in pickle_files:
f.unlink()
def task_weekly_gas_price_data():
df = gas_price_forecast_module.get_weekly_gas_price_data()
pickle.dump(df, open("/tmp/gas_price_forecast/variable_df.pickle", "wb"))
def task_weekly_gas_price_data_long():
df = pickle.load(open("/tmp/gas_price_forecast/variable_df.pickle", "rb"))
df_long = gas_price_forecast_module.get_weekly_gas_price_data_long(df)
pickle.dump(df_long, open("/tmp/gas_price_forecast/variable_df_long.pickle", "wb"))
def task_gas_price_forecast(cutoff_date, region):
cutoff_date = str(cutoff_date)
region = str(region)
df_long = pickle.load(open("/tmp/gas_price_forecast/variable_df_long.pickle", "rb"))
full_plot = gas_price_forecast_module.get_gas_price_forecast(
cutoff_date, df_long, region
)
pickle.dump(
full_plot, open("/tmp/gas_price_forecast/variable_full_plot.pickle", "wb")
)
default_dag_args = {
"owner": "airflow",
"retries": 2,
"start_date": days_ago(1),
"params": {"region": "U.S.", "cutoff_date": "2022-10-02"},
}
with DAG(
dag_id="gas_price_forecast_dag",
schedule_interval="*/15 * * * *",
max_active_runs=1,
catchup=False,
default_args=default_dag_args,
) as dag:
setup = PythonOperator(
task_id="dag_setup",
python_callable=dag_setup,
)
teardown = PythonOperator(
task_id="dag_teardown",
python_callable=dag_teardown,
)
weekly_gas_price_data = PythonOperator(
task_id="weekly_gas_price_data_task",
python_callable=task_weekly_gas_price_data,
)
weekly_gas_price_data_long = PythonOperator(
task_id="weekly_gas_price_data_long_task",
python_callable=task_weekly_gas_price_data_long,
)
gas_price_forecast = PythonOperator(
task_id="gas_price_forecast_task",
python_callable=task_gas_price_forecast,
op_kwargs={
"cutoff_date": "{{ params.cutoff_date }}",
"region": "{{ params.region }}",
},
)
weekly_gas_price_data >> weekly_gas_price_data_long
weekly_gas_price_data_long >> gas_price_forecast
setup >> weekly_gas_price_data
gas_price_forecast >> teardown
!cat pipeline/gas_price_forecast_module.py
import argparse
import re
import altair as alt
import numpy as np
import pandas as pd
import requests
from statsforecast.models import AutoARIMA
def get_weekly_gas_price_data():
response = requests.get(
"https://www.eia.gov/petroleum/gasdiesel/xls/pswrgvwall.xls"
)
df = pd.read_excel(
response.content,
sheet_name="Data 12",
index_col=0,
skiprows=2,
parse_dates=["Date"],
).rename(
columns=lambda c: re.sub(
"\(PADD 1[A-C]\)",
"",
c.replace("Weekly ", "").replace(
" All Grades All Formulations Retail Gasoline Prices (Dollars per Gallon)",
"",
),
).strip()
)
return df
def get_weekly_gas_price_data_long(df):
df_long = (
df.reset_index()
.melt(id_vars=["Date"], var_name="region", value_name="price")
.rename(columns={"Date": "week"})
.sort_values(["region", "week"])
.assign(
# if we're missing one value, just use the last value
# (happens twice)
price=lambda x: x["price"].combine_first(
x.groupby("region")["price"].shift(1)
),
# we'll forecast log(price) and then transform
log_price=lambda x: np.log(x["price"]),
# percentage price changes are approximately the difference in log(price)
price_change=lambda x: (
x["log_price"] - x.groupby("region")["log_price"].shift(1)
),
)
.query("price == price") # filter out NAs
)
return df_long
def get_gas_price_forecast(cutoff_date, df_long, region):
H = 13
CI = 80
width = 300
height = 250
plot_start_date = "2022-01-01"
plot_title = f"{region} (as of {cutoff_date})"
region_df = df_long.query(f"region == '{region}'")
train = region_df.query(f"week < '{cutoff_date}'")
m_aa = AutoARIMA()
m_aa.fit(train["log_price"].values)
raw_forecast = m_aa.predict(h=H, level=(CI,))
raw_forecast_exp = {key: np.exp(value) for key, value in raw_forecast.items()}
forecast = pd.DataFrame(raw_forecast_exp).assign(
week=pd.date_range(train["week"].max(), periods=H, freq="W")
+ pd.Timedelta("7 days")
)
forecast = pd.concat(
[
forecast,
train.tail(1)
.rename(columns={"price": "mean"})
.assign(
**{f"lo-{CI}": lambda x: x["mean"], f"hi-{CI}": lambda x: x["mean"]}
),
]
)
uncertainty_plot = (
forecast.pipe(alt.Chart, height=height, width=width)
.encode(
x="week",
y=alt.Y(f"lo-{CI}", title="Price"),
y2=alt.Y2(f"hi-{CI}", title="Price"),
)
.mark_area(opacity=0.2)
)
history_plot = (
region_df.query(f"week >= '{plot_start_date}'")
.pipe(alt.Chart, title=plot_title)
.encode(x=alt.X("week", title="Week"), y=alt.Y("price", title="Price"))
.mark_line()
)
forecast_plot = forecast.pipe(alt.Chart).encode(x="week", y="mean").mark_line()
cutoff_plot = (
train.tail(1).pipe(alt.Chart).encode(x="week").mark_rule(strokeDash=[10, 2])
)
full_plot = uncertainty_plot + history_plot + forecast_plot + cutoff_plot
return full_plot
def run_session_including_weekly_gas_price_data(
region="U.S.",
cutoff_date="2022-10-02",
):
# Given multiple artifacts, we need to save each right after
# its calculation to protect from any irrelevant downstream
# mutations (e.g., inside other artifact calculations)
import copy
artifacts = dict()
df = get_weekly_gas_price_data()
artifacts["weekly_gas_price_data"] = copy.deepcopy(df)
df_long = get_weekly_gas_price_data_long(df)
artifacts["weekly_gas_price_data_long"] = copy.deepcopy(df_long)
full_plot = get_gas_price_forecast(cutoff_date, df_long, region)
artifacts["gas_price_forecast"] = copy.deepcopy(full_plot)
return artifacts
def run_all_sessions(
region="U.S.",
cutoff_date="2022-10-02",
):
artifacts = dict()
artifacts.update(run_session_including_weekly_gas_price_data(region, cutoff_date))
return artifacts
if __name__ == "__main__":
# Edit this section to customize the behavior of artifacts
parser = argparse.ArgumentParser()
parser.add_argument("--region", type=str, default="U.S.")
parser.add_argument("--cutoff_date", type=str, default="2022-10-02")
args = parser.parse_args()
artifacts = run_all_sessions(
region=args.region,
cutoff_date=args.cutoff_date,
)
print(artifacts)
!cat pipeline/gas_price_forecast_requirements.txt
statsforecast==1.0.0
requests==2.28.0
re==2.2.1
altair==4.2.0
numpy==1.21.6
pandas==1.4.2
scipy==1.7.3
statsmodels==0.13.2