Skip to content

Simulating Past Betfair Markets

It seems to be a common experience of designing and backtesting a model using a csv dataset, only to find that the behaviour of the model in production doesn't match the backtesting. There could be instances of not being matched despite being at the front of the queue or just some weird unexplainable behaviour in the markets.

Thankfully Flumine, the python wrapper for the Betfair API, natively has a simulation mode that can be used to playback the historic data files and simulate placing bets in the market. We have run through how to process a small set of markets in simulation model in our How To Automate Series tutorials. So powerful is the simulation mode however, that it deserves its own standalone tutorial!

Limitations

The historic data files and their use in simulation mode have some limitations compared to using the live API.

  • Market Catalogue is not included in the historic stream files, so things like Runner Metadata cannot be accessed from the stream files
  • Cross matching (Virtual Bets) is also not included in the historic stream files, though for racing this should have limited impact
  • Live markets react to changes in volume on the exchange, so there will be no data on how any simulated bets would have affected other players in the market. This is salient if you wanted to test a strategy utilising large bet sizes or on high odds selections or using a trading model
  • The simulation is suited to Australian or New Zealand races but can be adapted for use in other countries with some minor changes.

This tutorial will utilise the PRO level files only, and has not been tested on the BASIC or ADVANCED level files. Australian and New Zealand customers can reach out to us at data@betfair.com.au to gain access to the PRO data files

Tutorial Outline

In this tutorial, we will:

  • Create a list of market ids from publicly available CSV files
  • Unzip the .tar files and decompress the stream files only for the markets we're interested in
  • Run the simulation on our racing win markets (Code will be for Thoroughbred markets but can easily be adapted for Harness or Greyhounds)
  • Process the output of listClearedOrders to understand where we might find an angle

The important thing that we'll test here is entering the market at different points in time relative to the scheduled race time to determine if we can find better prices (both back and lay) depending on how far out from the off we decide to enter the market! The advice regarding stake size for the simulations is to be realistic; don't use $1000 stakes if you would realistically only bet $10 as the behaviour of live markets will be much different.

Without further ado, let's jump straight into it!

Step 1 - Gather our markets and extract our files

Stream Files

  • You will need to have downloaded the tar files to your machine before starting
Import Libraries and Load Predictions
import os
import glob
import shutil
import tarfile
import bz2
import pandas as pd
from datetime import datetime, timedelta

'''
Here we will specify the folder where we are storing our stream files and where we want to extract the files to
The code will check if the output folder exists, and will create one if it does not.
It will then check the folder and delete any files in the folder that are not tar or csv files
This deletion is to clear any previously extracted stream files and will be much faster than manually deleting them
'''

# Specify the directory where your stream files are stored and where you want to extract the files
source_folder = 'D:/Stream Files/Thoroughbreds/Australia'
output_folder = 'D:/Unzipped Stream Files'

# Define the start and end date for our timeline
end_date = datetime.today() - timedelta(months=1)
start_date = datetime(2023,1,1) # The files used below don't go further back than this

# Ensure the output folder exists
os.makedirs(output_folder, exist_ok=True)

# Get a list of all files in the directory
files = glob.glob(os.path.join(output_folder, '*'))

# Loop through the files and delete if they are not .tar files
for file in files:
    if not file.endswith('.tar') and not file.endswith('.csv'):
        try:
            os.remove(file)
            print(f'Deleted: {file}')
        except Exception as e:
            print(f'Error deleting {file}: {e}')

'''
The following section will iterate over all files in your folder, so the options to optimize are:

    - Move the files you don't want to process to another folder
    - Copy the files you do want to process to a new folder
    - Use string matching to remove unwanted files from the tar_files list
    - Manually specify each file by typing out a list
'''

# Iterate over all .tar files in the source folder
tar_files = glob.glob(os.path.join(source_folder, '*.tar'))

# Remove any files for markets before 2023
for tar in tar_files:
    if '2023' not in tar and '2024' not in tar:
        tar_files[tar].remove

''''
The next section will be used to retrieve a list of win market ids from online csv files.
If you have the market_ids already in a csv file, use the below code instead

win_markets = pd.read_csv('marketIds.csv',dtype={'WIN_MARKET_ID' : str})
win_market_ids = win_markets['WIN_MARKET_ID'].tolist()
'''

def retrieve_betfair_markets(end_date, start_date):
    # List to store DataFrames
    result_dataframes = []
    # Define our date variable that will be changed
    current_date = start_date
    # Initialise the while loop to ensure we cover our whole date range
    while current_date <= end_date:
        # Generate the URL
        url = f'https://betfair-datascientists.github.io/data/assets/ANZ_Thoroughbreds_{current_date.year}_{current_date.month:02d}.csv'
        try:
            # Read CSV data into a DataFrame directly from the URL
            df = pd.read_csv(url)
            result_dataframes.append(df)
            print(f"Processed: {url}")
        except Exception as e:
            print(f"Failed to fetch data from: {url}, Error: {e}")
        # Move to the next month
        current_date = current_date.replace(day=1) + pd.DateOffset(months=1)
    # Concatenate all DataFrames into one
    betfair_results = pd.concat(result_dataframes, ignore_index=True)
    # Keep only the win market ids
    win_markets=betfair_results['WIN_MARKET_ID'].unique().tolist()

    return win_markets

win_markets = retrieve_betfair_markets(end_date, start_date)

'''
The next code block here will iterate over each stream file and check if the market_id is in the list of our win market ids
Only the win markets will be extracted to the folder.
'''

for tar_path in tar_files:

    with tarfile.open(tar_path, 'r') as tar:
        # Iterate over each file in the tar archive
        for member in tar.getmembers():
            if member.name.endswith('.bz2'):
                # Extract the .bz2 file to a temporary location
                extracted_bz2_path = os.path.join(output_folder, os.path.basename(member.name))

                # Determine the final output path by removing the .bz2 extension
                final_output_path = extracted_bz2_path[:-4]
                market_id = extracted_bz2_path[-13:-4]

                if market_id in win_markets:
                    with tar.extractfile(member) as extracted_file, open(extracted_bz2_path, 'wb') as temp_bz2_file:
                        shutil.copyfileobj(extracted_file, temp_bz2_file)

                    # Extract the .bz2 file to the final destination
                    with bz2.BZ2File(extracted_bz2_path, 'rb') as bz2_file, open(final_output_path, 'wb') as output_file:
                        shutil.copyfileobj(bz2_file, output_file)

                    # Remove the temporary .bz2 file
                    os.remove(extracted_bz2_path)

                    print(f'Extracted {member.name} to {final_output_path}')

Step 2 Run the simulation

Running the simulation
# Import libraries
import os
import time
import logging
import csv
import pandas as pd
import math
import re
from pythonjsonlogger import jsonlogger
from flumine import FlumineSimulation, BaseStrategy, utils, clients
from flumine.order.trade import Trade
from flumine.order.order import LimitOrder
from flumine.order.ordertype import OrderTypes
from flumine.markets.market import Market
from flumine.controls.loggingcontrols import LoggingControl
from betfairlightweight.resources import MarketBook
from pythonjsonlogger import jsonlogger
from concurrent import futures 
from flumine.utils import price_ticks_away
from collections import OrderedDict
from tqdm import tqdm

# Logging
logger = logging.getLogger()
custom_format = "%(asctime) %(levelname) %(message)"
log_handler = logging.StreamHandler()
formatter = jsonlogger.JsonFormatter(custom_format)
formatter.converter = time.gmtime
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.setLevel(logging.CRITICAL)

'''
This code file is designed to iterate over the folder with the previously unzipped win market stream files
and place simulated bets on selections at certain time points before the scheduled race start time

We will speed this process up using multi-threading
'''
# Specify the folder where the unzipped stream files are stored
source_folder = 'D:/Unzipped Stream Files'
output_folder = 'C:/Users/mitch/Documents/Scripts/Flumine-Simulations'

# Function to split our list into chunks
def split_list(lst, chunk_size):
    for i in range(0, len(lst), chunk_size):
        yield lst[i:i + chunk_size]

# Function to extract the race distance from the market_name
def race_distance_split(market_book: MarketBook):
    # Define the variable market_name and use regex to extract the distance
    market_name = market_book.market_definition.name
    match = re.search(r'\s(\d+)m', market_name)
    distance = int(match.group(1))

    return distance

# Function to extract the race type from the market_name
def race_type_split(market_book: MarketBook):
    market_name = market_book.market_definition.name
    match = re.search(r'[^ ]+$',market_name)
    race_type = match.group(0)

    return race_type

# Defining our flumine class
class AusThoroughbredSimulation(BaseStrategy):

    '''
    Define an empty list called processed_selection_ids and import an element called 'time_point'
    '''
    def __init__(self, *args, time_point,**kwargs):
        super().__init__(*args, **kwargs)
        self.processed_selection_ids = []
        self.time_point = time_point

    def check_market_book(self, market: Market, market_book: MarketBook) -> bool:
        # process_market_book only executed if this returns True
        if market_book.status != "CLOSED":
            return True

    def process_market_book(self, market: Market, market_book: MarketBook) -> None:
        # Define the distance according to our function
        distance = race_distance_split(market_book)
        race_type = race_type_split(market_book)

         # If the time before the scheduled off matched our time_point, then placed a bet
        if (round(market.seconds_to_start, 0) == self.time_point and not market_book.inplay):
            # Create an ordered dictionary for collecting our notes
            notes = OrderedDict()
            # Here we are collecting information about the market to write to our results files
            notes["venue"] = "venue:"+str(market_book.market_definition.venue)
            notes["race_distance"] = "race_distance:"+str(distance)
            notes["field_size"] = "field_size:"+str(market_book.number_of_active_runners)
            notes["race_type"] = "race_type:"+race_type
            notes["time_point"] = "seconds_before_scheduled_off:"+str(self.time_point)

            for runner in market_book.runners:

                # Check runner hasn't scratched and that first layer of back price and lay price exists
                if runner.status == "ACTIVE" and runner.ex.available_to_back and runner.ex.available_to_lay:
                    ''''
                    The below code will place a lay bet at the best available back price and a back bet at the best available lay price,
                    putting our order to the back of the queue for both orders, leaving them in the market until the market goes inplay,
                    at which point these orders will lapse. 
                    After the simulations have been run, we will then cut the files of cleared orders to find a profitable angle
                    '''
                    if runner.selection_id not in self.processed_selection_ids:
                        trade = Trade(
                            market_id=market_book.market_id,
                            selection_id=runner.selection_id,
                            handicap=runner.handicap,
                            notes=notes,
                            strategy=self,
                        )
                        order = trade.create_order(
                            side="LAY",
                            order_type=LimitOrder(
                                price=price_ticks_away(runner.ex.available_to_back[0]['price'], 0),
                                size=round((20 / (price_ticks_away(runner.ex.available_to_back[0]['price'], 0))), 2),
                                persistence_type="LAPSE"
                            )
                        )
                        market.place_order(order)

                        trade = Trade(
                            market_id=market_book.market_id,
                            selection_id=runner.selection_id,
                            handicap=runner.handicap,
                            notes=notes,
                            strategy=self,
                        )
                        order = trade.create_order(
                            side="BACK",
                            order_type=LimitOrder(
                                price=price_ticks_away(runner.ex.available_to_lay[0]['price'], 0),
                                size=round((20 / (price_ticks_away(runner.ex.available_to_lay[0]['price'], 0))), 2),
                                persistence_type="LAPSE"
                            )
                        )
                        market.place_order(order)
                        self.processed_selection_ids.append(runner.selection_id)


# Fields we want to log in our simulations
FIELDNAMES = [
    "bet_id",
    "strategy_name",
    "market_id",
    "selection_id",
    "trade_id",
    "date_time_placed",
    "price",
    "price_matched",
    "size",
    "size_matched",
    "profit",
    "side",
    "elapsed_seconds_executable",
    "order_status",
    "market_note",
    "trade_notes",
    "order_notes",
]

# Class to define logging class and output results to csv files
class BacktestLoggingControl(LoggingControl):
    NAME = "BACKTEST_LOGGING_CONTROL"

    def __init__(self, time_point, *args, **kwargs):
        self.time_point = time_point
        super().__init__(*args, **kwargs)
        self._setup()

    def _setup(self):
        if os.path.exists(f"tho_simulation_{self.time_point}_seconds.csv"):
            logging.info("Results file exists")
        else:
            with open(f"tho_simulation_{self.time_point}_seconds.csv", "w") as m:
                csv_writer = csv.DictWriter(m, delimiter=",", fieldnames=FIELDNAMES)
                csv_writer.writeheader()

    def _process_cleared_orders_meta(self, event):
        orders = event.event
        with open(f"tho_simulation_{self.time_point}_seconds.csv", "a") as m:
            for order in orders:
                if order.order_type.ORDER_TYPE == OrderTypes.LIMIT:
                    size = order.order_type.size
                else:
                    size = order.order_type.liability
                if order.order_type.ORDER_TYPE == OrderTypes.MARKET_ON_CLOSE:
                    price = None
                else:
                    price = order.order_type.price
                try:
                    order_data = {
                        "bet_id": order.bet_id,
                        "strategy_name": order.trade.strategy,
                        "market_id": order.market_id,
                        "selection_id": order.selection_id,
                        "trade_id": order.trade.id,
                        "date_time_placed": order.responses.date_time_placed,
                        "price": price,
                        "price_matched": order.average_price_matched,
                        "size": size,
                        "size_matched": order.size_matched,
                        "profit": order.simulated.profit,
                        "side": order.side,
                        "elapsed_seconds_executable": order.elapsed_seconds_executable,
                        "order_status": order.status.value,
                        "market_note": order.trade.market_notes,
                        "trade_notes": order.trade.notes_str,
                        "order_notes": order.notes_str,
                    }
                    csv_writer = csv.DictWriter(m, delimiter=",", fieldnames=FIELDNAMES)
                    csv_writer.writerow(order_data)
                except Exception as e:
                    logger.error(
                        "_process_cleared_orders_meta: %s" % e,
                        extra={"order": order, "error": e},
                    )

        logger.info("Orders updated", extra={"order_count": len(orders)})


def run_process(chunk,time_point):  
    try:
        # Set Flumine to simulation mode
        client = clients.SimulatedClient(min_bet_validation=False)
        framework = FlumineSimulation(client=client)

        # Set parameters for our strategy
        strategy = AusThoroughbredSimulation(
            market_filter={
                "markets": chunk,  
                'market_types': ['WIN'],
                "listener_kwargs": {"inplay": False, "seconds_to_start": 610},  
            },
            max_order_exposure=100,
            max_selection_exposure=100,
            max_live_trade_count=2,
            max_trade_count=2,
            time_point = time_point
        )
        # Run our strategy on the simulated market
        framework.add_strategy(strategy)
        framework.add_logging_control(BacktestLoggingControl(time_point))
        framework.run()
    except Exception as e:
        logger.error(f"Error in run_process: {e}")

# Define our list of time points before the off where we want to try placing bets
time_points = [600, 480, 360, 300, 240, 180, 120, 60, 30, 0, -30]

# Iterate over our stream files using different time points
for time_point in time_points:

    if __name__ == "__main__":
        data_folder = source_folder
        data_files = os.listdir(data_folder)
        data_files = [f'{data_folder}/{path}' for path in data_files]
        '''
        Here we split our stream files into chunks of 1000 markets, so that each thread will discard the chunk once the list is complete
        Not splitting this list into chunks will cause memory allocation issues and the process will crash
        When iterating over a small number of markets, the chunking will not be required
        '''
        chunks = list(split_list(data_files, 1000))

        print(f'Processing simulation for {time_point} seconds before the off')
        # Iterate over each chunk and print a progress bar using tqdm
        for chunk_index, chunk in enumerate(tqdm(chunks, desc=f"Chunks Progress")):

            all_markets = chunk
            processes = os.cpu_count()
            markets_per_process = 8  # Optimal to prevent data leakage

            _process_jobs = []

            with futures.ProcessPoolExecutor(max_workers=processes) as p:
                subset = math.ceil(len(all_markets) / processes)
                for m in utils.chunks(all_markets, subset):
                    _process_jobs.append(
                        p.submit(run_process, m, time_point)
                    )
                for job in futures.as_completed(_process_jobs):
                    job.result()

def process_csv_file(file_path):
    df = pd.read_csv(file_path, on_bad_lines='skip',low_memory=False)

    df.dropna(subset=['trade_notes'],inplace=True)

    # Process market_id column
    df['market_id'] = df['market_id'].astype(str).str.replace('1.', '', regex=False).str.ljust(9, '0')

    # Split trade_notes column into three new columns
    df[['venue','race_distance','field_size','race_type','seconds_before_scheduled_off']] = df['trade_notes'].str.split(',', expand=True)

    # Remove column names from the strings in each row
    df['venue'] = df['venue'].str.replace('venue:', '', regex=False)
    df['race_distance'] = df['race_distance'].str.replace('race_distance:', '', regex=False).astype(int)
    df['field_size'] = df['field_size'].str.replace('field_size:', '', regex=False).astype(int)
    df['race_type'] = df['race_type'].str.replace('race_type:', '', regex=False)
    df['seconds_before_scheduled_off'] = df['seconds_before_scheduled_off'].str.replace('seconds_before_scheduled_off:', '', regex=False).astype(int)

    # Process date_time_placed column
    df['date_time_placed'] = pd.to_datetime(df['date_time_placed'], dayfirst=True,format='mixed')
    df['date_time_placed'] = df['date_time_placed'].dt.tz_localize('UTC', ambiguous=False).dt.tz_convert('Australia/Melbourne')

    # Select relevant columns
    columns_to_keep = [
        'date_time_placed', 'venue', 'race_distance', 'race_type', 'field_size','seconds_before_scheduled_off',
        'market_id', 'selection_id', 'side', 'price', 'price_matched', 'size',
        'size_matched', 'elapsed_seconds_executable', 'profit'
    ]
    df = df[columns_to_keep]

    # Save the modified DataFrame to a new CSV file
    new_file_path = os.path.join(os.path.dirname(file_path), 'processed_' + os.path.basename(file_path))
    df.to_csv(new_file_path, index=False)

def process_all_files_in_folder(source_folder):
    for filename in os.listdir(source_folder):
        if filename.startswith('tho_simulation') and filename.endswith('.csv'):
            print(filename)
            file_path = os.path.join(source_folder, filename)
            process_csv_file(file_path)

# Process all our simulation files
process_all_files_in_folder(output_folder)

Analysis

Now that we've done our simulations lets look at the output.

Let's load up a sub selection of the bets - BACKING every selection on HANDICAP races at a price of less than $10. The graphs are cumulative profit in price matched

10 Minutes

5 Minutes

3 Minutes

0 Minutes

The shapes of the graph seems to indicate the odds range for which there is value seems to narrow as the race start approaches with value very much concentrated in the $3.50-$4.50 range. At the time of the off however (0 seconds) this peak has almost completely disappeared, which could indicate there's no value left in the market.

This of course is a very basic and narrow view of this dataset. There are many other ways to cut it including by field size, race distance, track or price movement. You could also join this to an additional dataset like the Betfair Hub Prediction Model or a dataset with data like trainer, barrier or jockey to determine if additional info could prove to be more profitable


Disclaimer

Note that whilst models and automated strategies are fun and rewarding to create, we can't promise that your model or betting strategy will be profitable, and we make no representations in relation to the code shared or information on this page. If you're using this code or implementing your own strategies, you do so entirely at your own risk and you are responsible for any winnings/losses incurred. Under no circumstances will Betfair be liable for any loss or damage you suffer.