Skip to content

Take away the pain! Processing TAR Files 101


So even though we've got some great tutorials up here on our Automation Hub, a common wish from many an automated customer, is for a simple plug-and-play tutorial and script for parsing the PRO TAR files into CSVs. So here it is, an easy-to-follow tutorial that makes use of existing tutorials by Tom Bishop and others to easily and painlessly process these JSON files into CSVs ready to be modelled.

And don't worry, this tutorial has plenty of error-handling built in so (hopefully) you won't wake up in the morning to find the script failed at 1AM while you were dreaming of the final straight at Moonee Valley.

This tutorial will concern itself primarily with racing, however, at the bottom of the page we have also included complete code suited for parsing sports TAR files including handling for line/handicap markets.

So without further ado, let's jump into it!

Cheat sheet

  • This is presented as a Jupyter notebook as this format is interactive and lets you run snippets of code from within the notebook. To use this functionality you'll need to download a copy of the ipynb file locally and open it in a text editor (i.e. VS code).
  • If you're looking for the complete code head to the bottom of the page or download the script from Github.
  • To run the code, save it to your machine, open a command prompt, or a terminal in your text editor of choice (we're using VS code), make sure you've navigated in the terminal to the folder you've saved the script in and then type py main.py (or whatever you've called your script file if not main) then hit enter. To stop the code running use Ctrl C.
  • Make sure you amend your data path to point to your data file. We'll be taking in an input of a historical tar file downloaded from the Betfair historic data site. We're using a PRO version, though the code should work on ADVANCED too. This approach won't work with the BASIC data tier. For more information on these files please reach out to us at automation
  • We're using the betfairlightweight and betfair_data package to do the heavy lifting
  • We've also posted the completed code logic on the betfair-downunder Github repo.

1.0 Setup

1.1 Importing libraries

Once again I'll be presenting the analysis in a jupyter notebook and will be using python as a programming language.

You'll need betfairlightweight and betfair_data which you can install with something like pip install betfairlightweight.

NOTE: Import 'betfair_data.bflw' could not be resolved from source - This is a known issue, the script should still run

import pandas as pd
import numpy as np
import os
import csv
import csv
import tarfile
import zipfile
import bz2
import glob
import ast
from unittest.mock import patch
import betfairlightweight
from betfairlightweight import StreamListener
from betfair_data import bflw 
import pandas as pd
from betfair_data import PriceSize
import functools
from typing import List, Tuple
from pandas.errors import SettingWithCopyWarning
import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
from itertools import zip_longest
from currency_converter import CurrencyConverter

1.2 Picking our intervals

Here we'll define our working folder and the timestamps for which we want to pull all of our data. We've set the script to pull prices from 10 minutes out from the jump at 30 second intervals and then at 5 second intervals from 1 minute out. Feel free to change as you need.

PRO TIP: copying your file path from windows explorer works really well as long as you replace the back-slashes with forward-slashes

file_directory= '' #INSERT FILE DIRECTORY WHERE TAR FILES ARE STORED

log1_Start = 60 * 10 # Seconds before scheduled off to start recording data for data segment one
log1_Step = 30       # Seconds between log steps for first data segment
log2_Start = 60 * 1  # Seconds before scheduled off to start recording data for data segment two
log2_Step = 5    # Seconds between log steps for second data segment

1.3 Filtering the markets

We'll then define a market filter as well as a way to filter out the harness racing markets. The different filter variables are listed below and can be added or removed as required. Some of these aren't that useful but have been listed for the sake of completeness.

# splitting race name and returning the parts 
def split_anz_horse_market_name(market_name: str) -> Tuple[str, int, str]:
    # return race no, length, race type
    # input samples: 
    # 'R6 1400m Grp1' -> ('R6','1400m','grp1')
    # 'R1 1609m Trot M' -> ('R1', '1609m', 'trot')
    # 'R4 1660m Pace M' -> ('R4', '1660m', 'pace')
    parts = market_name.split(' ')
    race_no = parts[0] 
    race_len = parts[1].split('m')
    race_len = race_len[0]
    race_type = parts[2].lower() 
    return (race_no, race_len, race_type)

# filtering markets to those that fit the following criteria
def filter_market(market: bflw.MarketBook) -> bool: 
    d = market.market_definition
    return (d != None
        and d.country_code == 'AU' 
        and d.market_type == 'WIN'
        and (c := split_anz_horse_market_name(d.name)[2]) != 'trot' and c != 'pace' #strips out Harness Races
        #and (c := split_anz_horse_market_name(d.name)[2]) == 'hcap'#can use this to filter by race type
        #and (c := split_anz_horse_market_name(d.name)[1]) >= '1200' #can use this to filter by race length
        )

# Simply add the below variable name to the market filter function above with the filter value
# Equals (== 'Value in Quotation' or True/False/None), Does Not Equal (!= 'Value in Quotation' or True/False/None) - FOR ALL TYPES
# Greater than (>), Greater than or equal to (>=), Less than (<), Less than or equal to (<=) - FOR INT/FLOAT
# For list of value 'in'

# and d.betting_type: str - ODDS, ASIAN_HANDICAP_SINGLES, ASIAN_HANDICAP_DOUBLES or LINE
# and d.bsp_market: bool - True, False
# and d.country_code: str - list of codes can be found here: https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 - Default value is 'GB' - Australia = 'AU', New Zealand = 'NZ'
# and d.event_id: str - PARENT_EVENT_ID
# and d.event_name: Optional[str] - Usually the name of the Match-Up (e.g. Bangladesh v Sri Lanka) or Race Meeting Name (e.g. Wangaratta (AUS) 1st Dec) - Note: Dictionaries don't support wildcard searches
# and d.event_type_id: str - SportID [Horse Racing - 7, Greyhounds - 4339]
# and d.market_base_rate: float - Market Commission Rate
# and d.market_type: str - e.g. "WIN"
# and d.name: Optional[str] - market name (e.g. R1 1170m Mdn)
# and d.number_of_active_runners: int - number of horses/dogs in the race
# and d.number_of_winners: int - Win market 1, Place markets 2+
# and d.turn_in_play_enabled: bool - True, False
# and d.venue: Optional[str] - Racing Only - Track

1.4 Utility Functions

Here we will define our trading and listener functions which will be able to handle the json file format as well as defining some more working directories. We've also defined a few functions to generate the required output.

NOTE: Input credentials are not required here

trading = betfairlightweight.APIClient(username = "username", password = "password", app_key="app_key")
listener = StreamListener(max_latency=None)

stream_files = glob.glob(file_directory+"*.tar") 
selection_meta = file_directory+"metadata.csv"
prices_path =  file_directory+"preplay.csv"

# rounding to 2 decimal places or returning '' if blank
def as_str(v) -> str:
    return '%.2f' % v if (type(v) is float) or (type(v) is int) else v if type(v) is str else ''

# returning smaller of two numbers where min not 0
def min_gr0(a: float, b: float) -> float:
    if a <= 0:
        return b
    if b <= 0:
        return a

    return min(a, b)

# parsing price data and pulling out weighted avg price, matched, min price and max price
def parse_traded(traded: List[PriceSize]) -> Tuple[float, float, float, float]:
    if len(traded) == 0: 
        return (None, None, None, None)

    (wavg_sum, matched, min_price, max_price) = functools.reduce(
        lambda total, ps: (
            total[0] + (ps.price * ps.size), # wavg_sum before we divide by total matched
            total[1] + ps.size, # total matched
            min(total[2], ps.price), # min price matched
            max(total[3], ps.price), # max price matched
        ),
        traded,
        (0, 0, 1001, 0) # starting default values
    )

    wavg_sum = (wavg_sum / matched) if matched > 0 else None # dividing sum of wavg by total matched
    matched = matched if matched > 0 else None 
    min_price = min_price if min_price != 1001 else None
    max_price = max_price if max_price != 0 else None

    return (wavg_sum, matched, min_price, max_price)


def load_markets(file_paths):
    for file_path in file_paths:
        print(file_path)
        print("__ Parsing Detailed Prices ___ ")
        if os.path.isdir(file_path):
            for path in glob.iglob(file_path + '**/**/*.bz2', recursive=True):
                f = bz2.BZ2File(path, 'rb')
                yield f
                f.close()
        elif os.path.isfile(file_path):
            ext = os.path.splitext(file_path)[1]
            # iterate through a tar archive
            if ext == '.tar':
                with tarfile.TarFile(file_path) as archive:
                    for file in archive:
                        yield bz2.open(archive.extractfile(file))
            # or a zip archive
            elif ext == '.zip':
                with zipfile.ZipFile(file_path) as archive:
                    for file in archive.namelist():
                        yield bz2.open(archive.open(file))

    return None

def slicePrice(l, n):
    try:
        x = l[n].price
    except:
        x = ""
    return(x)

def sliceSize(l, n):
    try:
        x = l[n].size
    except:
        x = ""
    return(x)

def pull_ladder(availableLadder, n = 5):
        out = {}
        price = []
        volume = []
        if len(availableLadder) == 0:
            return(out)        
        else:
            for rung in availableLadder[0:n]:
                price.append(rung.price)
                volume.append(rung.size)

            out["p"] = price
            out["v"] = volume
            return(out)

def final_market_book(s):

    with patch("builtins.open", lambda f, _: f):

        gen = s.get_generator()

        for market_books in gen():

            # Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++

            if ((evaluate_market := filter_market(market_books[0])) == False):
                    return(None)

            for market_book in market_books:

                last_market_book = market_book

        return(last_market_book)

1.5 Parsing Timestamped Prices

Next up are the functions that enable us to read the timestamped prices from the json files and output them ready to be joined on the metadata. This will provide:

  • Market ID
  • Number of Active Runners in the market (this can help in the event of late scratchings)
  • Timestamp (will help us later to calculate time relative to scheduled off time)
  • Traded Volume on the selection so far
  • Weighted Average Price
  • Last Traded Price
  • Selection Status (this can help in the event of late scratchings)
  • Reduction Factor (effect on the market if the runner were to be scratched)
  • Back Ladder (pulls 5 best prices and volume available)
  • Lay Ladder (pulls 5 best prices and volume available)
  • BSP Near Price
  • BSP Far Price

NOTE: Projected BSP displayed on the app/website is the Near Price - more information about Near/Far Prices and their accuracy can be found here

NOTE: The function is defined to stop pulling the prices once the market goes in-play. If in-play prices are required, then simply remove the condition for market_book.inplay. However, you will need to insert an inplay flag in order to distinguish when the market goes inplay in the csv file.

def loop_prices(s, o):

    with patch("builtins.open", lambda f, _: f):

        gen = s.get_generator()

        marketID = None
        tradeVols = None
        time = None

        for market_books in gen():

            # Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++

            if ((evaluate_market := filter_market(market_books[0])) == False):
                    break

            for market_book in market_books:

                # Time Step Management ++++++++++++++++++++++++++++++++++

                if marketID is None:

                    # No market initialised
                    marketID = market_book.market_id
                    time =  market_book.publish_time

                elif market_book.inplay:

                    # Stop once market inplay
                    break

                else:

                    seconds_to_start = (market_book.market_definition.market_time - market_book.publish_time).total_seconds()

                    if seconds_to_start > log1_Start:

                        # Too early before off to start logging prices
                        continue

                    else:

                        # Update data at different time steps depending on seconds to off
                        wait = np.where(seconds_to_start <= log2_Start, log2_Step, log1_Step)

                        # New Market
                        if market_book.market_id != marketID:
                            marketID = market_book.market_id
                            time =  market_book.publish_time
                        # (wait) seconds elapsed since last write
                        elif (market_book.publish_time - time).total_seconds() > wait:
                            time = market_book.publish_time
                        # fewer than (wait) seconds elapsed continue to next loop
                        else:
                            continue

                # Execute Data Logging ++++++++++++++++++++++++++++++++++

                for runner in market_book.runners:

                    try:
                        selection_status = runner.status
                        reduction_factor = runner.adjustment_factor
                        atb_ladder = pull_ladder(runner.ex.available_to_back, n = 5)
                        atl_ladder = pull_ladder(runner.ex.available_to_lay, n = 5)
                        spn = runner.sp.near_price
                        spf = runner.sp.far_price
                    except:
                        selection_status = None
                        reduction_factor = None
                        atb_ladder = {}
                        atl_ladder = {}
                        spn = None
                        spf = None

                    # Calculate Current Traded Volume + Traded WAP
                    limitTradedVol = sum([rung.size for rung in runner.ex.traded_volume])
                    if limitTradedVol == 0:
                        limitWAP = ""
                    else:
                        limitWAP = sum([rung.size * rung.price for rung in runner.ex.traded_volume]) / limitTradedVol
                        limitWAP = round(limitWAP, 2)

                    o.writerow(
                        (
                            market_book.market_id,
                            market_book.number_of_active_runners,
                            runner.selection_id,
                            market_book.publish_time,
                            limitTradedVol,
                            limitWAP,
                            runner.last_price_traded or "",
                            selection_status,
                            reduction_factor,
                            str(atb_ladder).replace(' ',''), 
                            str(atl_ladder).replace(' ',''),
                            str(spn),
                            str(spf)
                        )
                    )


def parse_prices(dir, out_file):

    with open(out_file, "w+") as output:

        writer = csv.writer(
            output, 
            delimiter=',',
            lineterminator='\r\n',
            quoting=csv.QUOTE_ALL
        )

        writer.writerow(("market_id","active_runners","selection_id","time","traded_volume","wap","ltp","selection_status",'reduction_factor',"atb_ladder","atl_ladder","sp_near","sp_far"))

        for file_obj in load_markets(dir):

            stream = trading.streaming.create_historical_generator_stream(
                file_path=file_obj,
                listener=listener,
            )

            loop_prices(stream, writer)

1.6 The Good Stuff

Now comes the really meaty chunk of code that will do the grunt work for us. We'll generate the timestamped prices for each TAR file separately and then loop over them again to generate the selection metadata. (This is using a chunk of code from our JSON to CSV Revisited tutorial using the Betfair_Data package. This is super speedy because its mainly written in Rust rather than python) In our metadata files we will provide:

  • Market ID
  • Market Time
  • Country Code
  • Track/Venue
  • Market Name
  • Selection ID
  • Horse Name
  • Result
  • BSP
  • Preplay Minimum Matched Price
  • Preplay Maximum Matched Price
  • Preplay Weighted Average Matched Price
  • Preplay Last Traded Price
  • Preplay Matched Volume (not including SP volume)
  • BSP Matched Volume (Back Stake)
  • Inplay Minimum Matched Price
  • Inplay Maximum Matched Price
  • Inplay Weighted Average Matched Price
  • Inplay Last Traded Price
  • Inplay Matched Volume

Note: For markets that don't go in play (like Greyhounds/Place markets), the 'Inplay' fields will be empty.

Following this metadata generation, we'll then join the dataframes together and add the "seconds to scheduled off" field, as well as converting the GMT time in the TAR Files to Melbourne time (this is useful for when markets are timed for early morning before 10am (or 11am during daylight savings) which causes the local event date to be the previous day in GMT) and converting the currency to Australian Dollars using a historical currency conversion.

The final piece will be outputting the files to CSV. The code is setup to output a separate file for each race which reduces the number of rows per race and enables output to be easily opened and checked for completeness. Changing this to output one CSV file per TAR file or one CSV file per day is fairly straightforward.

#loop over each TAR file
for tar in stream_files:
    parse_prices([tar], prices_path) #This is where the timestamped prices are generated
    print("__ Parsing Market and Selection Data ___ ")

    # record prices to a file
    with open(selection_meta, "w") as output:
    # defining column headers
        output.write("market_id,event_date,country,track,event_name,selection_id,selection_name,result,bsp,pp_min,pp_max,pp_wap,pp_ltp,pp_volume,bsp_volume,ip_min,ip_max,ip_wap,ip_ltp,ip_volume\n")

        for i, g in enumerate(bflw.Files([tar])):
            print("Market {}".format(i), end='\r')

            def get_pre_post_final():
                eval_market = None
                prev_market = None
                preplay_market = None
                postplay_market = None       

                for market_books in g:
                    for market_book in market_books:
                        # if market doesn't meet filter return out
                        if eval_market is None and ((eval_market := filter_market(market_book)) == False):
                            return (None, None, None)

                        # final market view before market goes in play
                        if prev_market is not None and prev_market.inplay != market_book.inplay:
                            preplay_market = prev_market

                        # final market view at the conclusion of the market
                        if prev_market is not None and prev_market.status == "OPEN" and market_book.status != prev_market.status:
                            postplay_market = market_book

                        # update reference to previous market
                        prev_market = market_book

                return (preplay_market, postplay_market, prev_market) # prev is now final

            (preplay_market, postplay_market, final_market) = get_pre_post_final()

            # no price data for market
            if postplay_market is None:
                continue; 

            preplay_traded = [ (r.last_price_traded, r.ex.traded_volume) for r in preplay_market.runners ] if preplay_market is not None else None

            postplay_traded = [ (
                r.last_price_traded,
                r.ex.traded_volume,
                # calculating SP traded vol as smaller of back_stake_taken or (lay_liability_taken / (bsp - 1))        
                min_gr0(
                    next((pv.size for pv in r.sp.back_stake_taken if pv.size > 0), 0),
                    next((pv.size for pv in r.sp.lay_liability_taken if pv.size > 0), 0)  / ((r.sp.actual_sp if (type(r.sp.actual_sp) is float) or (type(r.sp.actual_sp) is int) else 0) - 1)
                ) if r.sp.actual_sp is not None else 0,
            ) for r in postplay_market.runners ]

            runner_data = [
            {
                'selection_id': r.selection_id,
                'selection_name': next((rd.name for rd in final_market.market_definition.runners if rd.selection_id == r.selection_id), None),
                'selection_status': r.status,
                'sp': as_str(r.sp.actual_sp),
            }
            for r in final_market.runners 
            ]

            # runner price data for markets that go in play
            if preplay_traded is not None:
                def runner_vals(r):
                    (pre_ltp, pre_traded), (post_ltp, post_traded, sp_traded) = r

                    inplay_only = list(filter(lambda ps: ps.size > 0, [
                        PriceSize(
                            price=post_ps.price, 
                            size=post_ps.size - next((pre_ps.size for pre_ps in pre_traded if pre_ps.price == post_ps.price), 0)
                        )
                        for post_ps in post_traded 
                    ]))

                    (ip_wavg, ip_matched, ip_min, ip_max) = parse_traded(inplay_only)
                    (pre_wavg, pre_matched, pre_min, pre_max) = parse_traded(pre_traded)

                    return {
                        'preplay_ltp': as_str(pre_ltp),
                        'preplay_min': as_str(pre_min),
                        'preplay_max': as_str(pre_max),
                        'preplay_wavg': as_str(pre_wavg),
                        'preplay_matched': as_str(pre_matched or 0),
                        'bsp_matched': as_str(sp_traded or 0),
                        'inplay_ltp': as_str(post_ltp),
                        'inplay_min': as_str(ip_min),
                        'inplay_max': as_str(ip_max),
                        'inplay_wavg': as_str(ip_wavg),
                        'inplay_matched': as_str(ip_matched),
                    }

                runner_traded = [ runner_vals(r) for r in zip_longest(preplay_traded, postplay_traded, fillvalue=PriceSize(0, 0)) ]

            # runner price data for markets that don't go in play
            else:
                def runner_vals(r):
                    (ltp, traded, sp_traded) = r
                    (wavg, matched, min_price, max_price) = parse_traded(traded)

                    return {
                        'preplay_ltp': as_str(ltp),
                        'preplay_min': as_str(min_price),
                        'preplay_max': as_str(max_price),
                        'preplay_wavg': as_str(wavg),
                        'preplay_matched': as_str(matched or 0),
                        'bsp_matched': as_str(sp_traded or 0),
                        'inplay_ltp': '',
                        'inplay_min': '',
                        'inplay_max': '',
                        'inplay_wavg': '',
                        'inplay_matched': '',
                    }

                runner_traded = [ runner_vals(r) for r in postplay_traded ]

            # printing to csv for each runner
            for (rdata, rprices) in zip(runner_data, runner_traded):
                # defining data to go in each column
                output.write(
                    "{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n".format(
                        postplay_market.market_id,
                        postplay_market.market_definition.market_time,
                        postplay_market.market_definition.country_code,
                        postplay_market.market_definition.venue,
                        postplay_market.market_definition.name,
                        rdata['selection_id'],
                        rdata['selection_name'],
                        rdata['selection_status'],
                        rdata['sp'],
                        rprices['preplay_min'],
                        rprices['preplay_max'],
                        rprices['preplay_wavg'],
                        rprices['preplay_ltp'],
                        rprices['preplay_matched'],
                        rprices['bsp_matched'],
                        rprices['inplay_min'],
                        rprices['inplay_max'],
                        rprices['inplay_wavg'],
                        rprices['inplay_ltp'],
                        rprices['inplay_matched'],
                    )
                )


    #loading selection file and parsing dates
    selection = pd.read_csv(selection_meta, dtype={'market_id': object, 'selection_id': object}, parse_dates = ['event_date'])

    #loading price file and parsing dates
    prices = pd.read_csv(
        prices_path, 
        quoting=csv.QUOTE_ALL,
        dtype={'market_id': 'string', 'selection_id': 'string', 'atb_ladder': 'string', 'atl_ladder': 'string'},
        parse_dates=['time']
    )

    #creating the ladder as a dictionary
    prices['atb_ladder'] = [ast.literal_eval(x) for x in prices['atb_ladder']]
    prices['atl_ladder'] = [ast.literal_eval(x) for x in prices['atl_ladder']]

    #merging the price and selection files
    df = selection.merge(prices, on = ['market_id', 'selection_id'])
    #assigning best prices available and calculating time relative to market start time
    df = (
        df
        .assign(back_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atb_ladder']])
        .assign(lay_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atl_ladder']])
        .assign(seconds_before_scheduled_off = lambda x: round((x['event_date'] - x['time']).dt.total_seconds()))
        .query('seconds_before_scheduled_off < @log1_Start')
    )

    #creating a unique list of market ids
    marketids = df['market_id'].unique().tolist()

    #writing each market to its own csv file
    for market in marketids:
        #create a dataframe and a naming convention for this market
        pricing_data=df[(df['market_id']==market)]
        if pricing_data.empty:
            continue
        race_track=pricing_data['track'].iloc[0]
        market_name=pricing_data['event_name'].iloc[0]
        market_time=pricing_data['event_date'].iloc[0]
        off=market_time.strftime('%Y-%m-%d')
        #write race details to the dataframe
        pricing_data['race']=pricing_data['event_name'].str.split('R').str[1]
        pricing_data['race']=pricing_data['race'].str.split(' ').str[0]
        pricing_data['distance']=pricing_data['event_name'].str.split(' ').str[1]
        pricing_data['distance']=pricing_data['distance'].str.split('m').str[0]
        pricing_data['race_type']=pricing_data['event_name'].str.split('m ').str[1]
        pricing_data['selection_name']=pricing_data['selection_name'].str.split('\. ').str[1]
        #convert GMT timezone to AEST/AEDT
        pricing_data['event_date']=pricing_data['event_date'].astype('datetime64[ns]')
        pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize('UTC',ambiguous=False)
        pricing_data['event_date']=pricing_data['event_date'].dt.tz_convert('Australia/Melbourne')
        pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize(None)
        pricing_data['time']=pricing_data['time'].astype('datetime64[ns]')
        pricing_data['time']=pricing_data['time'].dt.tz_localize('UTC',ambiguous=False)
        pricing_data['time']=pricing_data['time'].dt.tz_convert('Australia/Melbourne')
        pricing_data['time']=pricing_data['time'].dt.tz_localize(None)
        #covert GBP to AUD
        event_date=(pd.to_datetime(pricing_data['event_date']).dt.date).iloc[0]
        conversion_rate=CurrencyConverter(fallback_on_missing_rate=True).convert(1,'GBP','AUD',date=event_date)
        pricing_data['traded_volume']=pricing_data['traded_volume']*conversion_rate
        pricing_data.loc[(pricing_data['traded_volume'] < 0), 'traded_volume'] = 0
        pricing_data['traded_volume'] = pricing_data['traded_volume'].round(decimals=2)
        #reorder the dataframe and write to csv
        pricing_data=pricing_data[['event_date','country','track','race','distance','race_type','market_id','selection_id','selection_name',"selection_status",'reduction_factor','result','bsp','time','traded_volume','wap','ltp','atb_ladder','atl_ladder','back_best','lay_best','seconds_before_scheduled_off','sp_near','sp_far','pp_min','pp_max','pp_wap','pp_ltp','pp_volume','bsp_volume','ip_min','ip_max','ip_wap','ip_ltp','ip_volume']]
        pricing_data.to_csv(file_directory+off+' - '+race_track+' - '+market_name+'.csv',index=False)

1.7 Clean-Up

The final piece is just to clean up and remove the intermediate files. If you're encountering errors with the final output, comment this code out so these intermediate files can be viewed.

#removing intermediate working documents to clean up
os.remove(selection_meta)
os.remove(prices_path)

1.8 Conclusion and Next Steps

If you'd like to expand your datasets, here's a list of other accessible data sources that can add additional datapoints and are fairly simple to join:

  • Unsupported API endpoint for a day's racecard - https://apigateway.betfair.com.au/hub/racecard?date=YYYY-MM-DD (simply substitute your day at the end)
  • Unsupported API endpoint for market and runner metadata including Barrier, Trainer, Jockey and Best Tote Price - https://apigateway.betfair.com.au/hub/raceevent/market_id (simply substitute your market_id at the end including "1.")
  • Carrot Cruncher Horse Racing Model (url=https://betfair-data-supplier-prod.herokuapp.com/api/widgets/kash-ratings-model/datasets?date='+YYYY-MM-DD+'&presenter=RatingsPresenter&csv=true) (simply substitute your date in the middle). These ratings are available back to 18/2/21.
  • Iggy Joey Greyhound Model (url=https://betfair-data-supplier-prod.herokuapp.com/api/widgets/iggy-joey/datasets?date='+YYYY-MM-DD+'&presenter=RatingsPresenter&csv=true) (simply substitute your date in the middle)

Hopefully this tutorial has helped and provides a great resource to be able to process these powerful, if cumbersome, TAR files and assists in your data gathering!

For further tutorials on how to handle these new CSV files to create a model - check out our suite of other tutorials in the modelling section

2.0 Complete Code

2.1 Complete Code Racing Processor

import pandas as pd
import numpy as np
import os
import csv
import csv
import tarfile
import zipfile
import bz2
import glob
import ast
from unittest.mock import patch
import betfairlightweight
from betfairlightweight import StreamListener
from betfair_data import bflw
import pandas as pd
from betfair_data import PriceSize
import functools
from typing import List, Tuple
from pandas.errors import SettingWithCopyWarning
import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
from itertools import zip_longest
from currency_converter import CurrencyConverter

file_directory= 'C:/Users/motykam/Documents/Thoroughbreds/' #INSERT FILE DIRECTORY WHERE TAR FILES ARE STORED

log1_Start = 60 * 10 # Seconds before scheduled off to start recording data for data segment one
log1_Step = 30       # Seconds between log steps for first data segment
log2_Start = 60 * 1  # Seconds before scheduled off to start recording data for data segment two
log2_Step = 1    # Seconds between log steps for second data segment

# splitting race name and returning the parts 
def split_anz_horse_market_name(market_name: str) -> Tuple[str, int, str]:
    # return race no, length, race type
    # input samples: 
    # 'R6 1400m Grp1' -> ('R6','1400m','grp1')
    # 'R1 1609m Trot M' -> ('R1', '1609m', 'trot')
    # 'R4 1660m Pace M' -> ('R4', '1660m', 'pace')
    parts = market_name.split(' ')
    race_no = parts[0] 
    race_len = parts[1].split('m')
    race_len = race_len[0]
    race_type = parts[2].lower() 
    return (race_no, race_len, race_type)

# filtering markets to those that fit the following criteria
def filter_market(market: bflw.MarketBook) -> bool: 
    d = market.market_definition
    return (d != None
        and d.country_code == 'AU' 
        and d.market_type == 'WIN'
        and (c := split_anz_horse_market_name(d.name)[2]) != 'trot' and c != 'pace' #strips out Harness Races
        #and (c := split_anz_horse_market_name(d.name)[2]) == 'hcap'
        and (c := split_anz_horse_market_name(d.name)[1]) >= '1200'
        )

# Simply add the below variable name to the market filter function above with the filter value
# Equals (== 'Value in Quotation' or True/False/None), Does Not Equal (!= 'Value in Quotation' or True/False/None) - FOR ALL TYPES
# Greater than (>), Greater than or equal to (>=), Less than (<), Less than or equal to (<=) - FOR INT/FLOAT
# For list of value 'in'

# and d.betting_type: str - ODDS, ASIAN_HANDICAP_SINGLES, ASIAN_HANDICAP_DOUBLES or LINE
# and d.bsp_market: bool - True, False
# and d.country_code: str - list of codes can be found here: https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 - Default value is 'GB' - Australia = 'AU', New Zealand = 'NZ'
# and d.event_id: str - PARENT_EVENT_ID
# and d.event_name: Optional[str] - Usually the name of the Match-Up (e.g. Bangladesh v Sri Lanka) or Race Meeting Name (e.g. Wangaratta (AUS) 1st Dec) - Note: Dictionaries don't support wildcard searches
# and d.event_type_id: str - SportID [Horse Racing - 7, Greyhounds - 4339]
# and d.market_base_rate: float - Market Commission Rate
# and d.market_type: str - e.g. "WIN"
# and d.name: Optional[str] - market name (e.g. R1 1170m Mdn)
# and d.number_of_active_runners: int - number of horses/dogs in the race
# and d.number_of_winners: int - Win market 1, Place markets 2+
# and d.turn_in_play_enabled: bool - True, False
# and d.venue: Optional[str] - Racing Only - Track

trading = betfairlightweight.APIClient(username = "username", password = "password", app_key="app_key")
listener = StreamListener(max_latency=None)

stream_files = glob.glob(file_directory+"*.tar") 
selection_meta = file_directory+"metadata.csv"
prices_path =  file_directory+"preplay.csv"

# rounding to 2 decimal places or returning '' if blank
def as_str(v) -> str:
    return '%.2f' % v if (type(v) is float) or (type(v) is int) else v if type(v) is str else ''

# returning smaller of two numbers where min not 0
def min_gr0(a: float, b: float) -> float:
    if a <= 0:
        return b
    if b <= 0:
        return a

    return min(a, b)

# parsing price data and pulling out weighted avg price, matched, min price and max price
def parse_traded(traded: List[PriceSize]) -> Tuple[float, float, float, float]:
    if len(traded) == 0: 
        return (None, None, None, None)

    (wavg_sum, matched, min_price, max_price) = functools.reduce(
        lambda total, ps: (
            total[0] + (ps.price * ps.size), # wavg_sum before we divide by total matched
            total[1] + ps.size, # total matched
            min(total[2], ps.price), # min price matched
            max(total[3], ps.price), # max price matched
        ),
        traded,
        (0, 0, 1001, 0) # starting default values
    )

    wavg_sum = (wavg_sum / matched) if matched > 0 else None # dividing sum of wavg by total matched
    matched = matched if matched > 0 else None 
    min_price = min_price if min_price != 1001 else None
    max_price = max_price if max_price != 0 else None

    return (wavg_sum, matched, min_price, max_price)


def load_markets(file_paths):
    for file_path in file_paths:
        print(file_path)
        print("__ Parsing Detailed Prices ___ ")
        if os.path.isdir(file_path):
            for path in glob.iglob(file_path + '**/**/*.bz2', recursive=True):
                f = bz2.BZ2File(path, 'rb')
                yield f
                f.close()
        elif os.path.isfile(file_path):
            ext = os.path.splitext(file_path)[1]
            # iterate through a tar archive
            if ext == '.tar':
                with tarfile.TarFile(file_path) as archive:
                    for file in archive:
                        yield bz2.open(archive.extractfile(file))
            # or a zip archive
            elif ext == '.zip':
                with zipfile.ZipFile(file_path) as archive:
                    for file in archive.namelist():
                        yield bz2.open(archive.open(file))

    return None

def slicePrice(l, n):
    try:
        x = l[n].price
    except:
        x = ""
    return(x)

def sliceSize(l, n):
    try:
        x = l[n].size
    except:
        x = ""
    return(x)

def pull_ladder(availableLadder, n = 5):
        out = {}
        price = []
        volume = []
        if len(availableLadder) == 0:
            return(out)        
        else:
            for rung in availableLadder[0:n]:
                price.append(rung.price)
                volume.append(rung.size)

            out["p"] = price
            out["v"] = volume
            return(out)

def final_market_book(s):

    with patch("builtins.open", lambda f, _: f):

        gen = s.get_generator()

        for market_books in gen():

            # Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++

            if ((evaluate_market := filter_market(market_books[0])) == False):
                    return(None)

            for market_book in market_books:

                last_market_book = market_book

        return(last_market_book)

def loop_prices(s, o):

    with patch("builtins.open", lambda f, _: f):

        gen = s.get_generator()

        marketID = None
        tradeVols = None
        time = None

        for market_books in gen():

            # Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++

            if ((evaluate_market := filter_market(market_books[0])) == False):
                    break

            for market_book in market_books:

                # Time Step Management ++++++++++++++++++++++++++++++++++

                if marketID is None:

                    # No market initialised
                    marketID = market_book.market_id
                    time =  market_book.publish_time

                elif market_book.inplay:

                    # Stop once market inplay
                    break

                else:

                    seconds_to_start = (market_book.market_definition.market_time - market_book.publish_time).total_seconds()

                    if seconds_to_start > log1_Start:

                        # Too early before off to start logging prices
                        continue

                    else:

                        # Update data at different time steps depending on seconds to off
                        wait = np.where(seconds_to_start <= log2_Start, log2_Step, log1_Step)

                        # New Market
                        if market_book.market_id != marketID:
                            marketID = market_book.market_id
                            time =  market_book.publish_time
                        # (wait) seconds elapsed since last write
                        elif (market_book.publish_time - time).total_seconds() > wait:
                            time = market_book.publish_time
                        # fewer than (wait) seconds elapsed continue to next loop
                        else:
                            continue

                # Execute Data Logging ++++++++++++++++++++++++++++++++++

                for runner in market_book.runners:

                    try:
                        selection_status = runner.status
                        reduction_factor = runner.adjustment_factor
                        atb_ladder = pull_ladder(runner.ex.available_to_back, n = 5)
                        atl_ladder = pull_ladder(runner.ex.available_to_lay, n = 5)
                        spn = runner.sp.near_price
                        spf = runner.sp.far_price
                    except:
                        selection_status = None
                        reduction_factor = None
                        atb_ladder = {}
                        atl_ladder = {}
                        spn = None
                        spf = None

                    # Calculate Current Traded Volume + Traded WAP
                    limitTradedVol = sum([rung.size for rung in runner.ex.traded_volume])
                    if limitTradedVol == 0:
                        limitWAP = ""
                    else:
                        limitWAP = sum([rung.size * rung.price for rung in runner.ex.traded_volume]) / limitTradedVol
                        limitWAP = round(limitWAP, 2)

                    o.writerow(
                        (
                            market_book.market_id,
                            market_book.number_of_active_runners,
                            runner.selection_id,
                            market_book.publish_time,
                            limitTradedVol,
                            limitWAP,
                            runner.last_price_traded or "",
                            selection_status,
                            reduction_factor,
                            str(atb_ladder).replace(' ',''), 
                            str(atl_ladder).replace(' ',''),
                            str(spn),
                            str(spf)
                        )
                    )


def parse_prices(dir, out_file):

    with open(out_file, "w+") as output:

        writer = csv.writer(
            output, 
            delimiter=',',
            lineterminator='\r\n',
            quoting=csv.QUOTE_ALL
        )

        writer.writerow(("market_id","active_runners","selection_id","time","traded_volume","wap","ltp","selection_status",'reduction_factor',"atb_ladder","atl_ladder","sp_near","sp_far"))

        for file_obj in load_markets(dir):

            stream = trading.streaming.create_historical_generator_stream(
                file_path=file_obj,
                listener=listener,
            )

            loop_prices(stream, writer)



#loop over each TAR file
for tar in stream_files:
    parse_prices([tar], prices_path)
    print("__ Parsing Market and Selection Data ___ ")

    # record prices to a file
    with open(selection_meta, "w") as output:
    # defining column headers
        output.write("market_id,event_date,country,track,event_name,selection_id,selection_name,result,bsp,pp_min,pp_max,pp_wap,pp_ltp,pp_volume,bsp_volume,ip_min,ip_max,ip_wap,ip_ltp,ip_volume\n")

        for i, g in enumerate(bflw.Files([tar])):
            print("Market {}".format(i), end='\r')

            def get_pre_post_final():
                eval_market = None
                prev_market = None
                preplay_market = None
                postplay_market = None       

                for market_books in g:
                    for market_book in market_books:
                        # if market doesn't meet filter return out
                        if eval_market is None and ((eval_market := filter_market(market_book)) == False):
                            return (None, None, None)

                        # final market view before market goes in play
                        if prev_market is not None and prev_market.inplay != market_book.inplay:
                            preplay_market = prev_market

                        # final market view at the conclusion of the market
                        if prev_market is not None and prev_market.status == "OPEN" and market_book.status != prev_market.status:
                            postplay_market = market_book

                        # update reference to previous market
                        prev_market = market_book

                return (preplay_market, postplay_market, prev_market) # prev is now final

            (preplay_market, postplay_market, final_market) = get_pre_post_final()

            # no price data for market
            if postplay_market is None:
                continue; 

            preplay_traded = [ (r.last_price_traded, r.ex.traded_volume) for r in preplay_market.runners ] if preplay_market is not None else None

            postplay_traded = [ (
                r.last_price_traded,
                r.ex.traded_volume,
                # calculating SP traded vol as smaller of back_stake_taken or (lay_liability_taken / (bsp - 1))        
                min_gr0(
                    next((pv.size for pv in r.sp.back_stake_taken if pv.size > 0), 0),
                    next((pv.size for pv in r.sp.lay_liability_taken if pv.size > 0), 0)  / ((r.sp.actual_sp if (type(r.sp.actual_sp) is float) or (type(r.sp.actual_sp) is int) else 0) - 1)
                ) if r.sp.actual_sp is not None else 0,
            ) for r in postplay_market.runners ]

            runner_data = [
            {
                'selection_id': r.selection_id,
                'selection_name': next((rd.name for rd in final_market.market_definition.runners if rd.selection_id == r.selection_id), None),
                'selection_status': r.status,
                'sp': as_str(r.sp.actual_sp),
            }
            for r in final_market.runners 
            ]

            # runner price data for markets that go in play
            if preplay_traded is not None:
                def runner_vals(r):
                    (pre_ltp, pre_traded), (post_ltp, post_traded, sp_traded) = r

                    inplay_only = list(filter(lambda ps: ps.size > 0, [
                        PriceSize(
                            price=post_ps.price, 
                            size=post_ps.size - next((pre_ps.size for pre_ps in pre_traded if pre_ps.price == post_ps.price), 0)
                        )
                        for post_ps in post_traded 
                    ]))

                    (ip_wavg, ip_matched, ip_min, ip_max) = parse_traded(inplay_only)
                    (pre_wavg, pre_matched, pre_min, pre_max) = parse_traded(pre_traded)

                    return {
                        'preplay_ltp': as_str(pre_ltp),
                        'preplay_min': as_str(pre_min),
                        'preplay_max': as_str(pre_max),
                        'preplay_wavg': as_str(pre_wavg),
                        'preplay_matched': as_str(pre_matched or 0),
                        'bsp_matched': as_str(sp_traded or 0),
                        'inplay_ltp': as_str(post_ltp),
                        'inplay_min': as_str(ip_min),
                        'inplay_max': as_str(ip_max),
                        'inplay_wavg': as_str(ip_wavg),
                        'inplay_matched': as_str(ip_matched),
                    }

                runner_traded = [ runner_vals(r) for r in zip_longest(preplay_traded, postplay_traded, fillvalue=PriceSize(0, 0)) ]

            # runner price data for markets that don't go in play
            else:
                def runner_vals(r):
                    (ltp, traded, sp_traded) = r
                    (wavg, matched, min_price, max_price) = parse_traded(traded)

                    return {
                        'preplay_ltp': as_str(ltp),
                        'preplay_min': as_str(min_price),
                        'preplay_max': as_str(max_price),
                        'preplay_wavg': as_str(wavg),
                        'preplay_matched': as_str(matched or 0),
                        'bsp_matched': as_str(sp_traded or 0),
                        'inplay_ltp': '',
                        'inplay_min': '',
                        'inplay_max': '',
                        'inplay_wavg': '',
                        'inplay_matched': '',
                    }

                runner_traded = [ runner_vals(r) for r in postplay_traded ]

            # printing to csv for each runner
            for (rdata, rprices) in zip(runner_data, runner_traded):
                # defining data to go in each column
                output.write(
                    "{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n".format(
                        postplay_market.market_id,
                        postplay_market.market_definition.market_time,
                        postplay_market.market_definition.country_code,
                        postplay_market.market_definition.venue,
                        postplay_market.market_definition.name,
                        rdata['selection_id'],
                        rdata['selection_name'],
                        rdata['selection_status'],
                        rdata['sp'],
                        rprices['preplay_min'],
                        rprices['preplay_max'],
                        rprices['preplay_wavg'],
                        rprices['preplay_ltp'],
                        rprices['preplay_matched'],
                        rprices['bsp_matched'],
                        rprices['inplay_min'],
                        rprices['inplay_max'],
                        rprices['inplay_wavg'],
                        rprices['inplay_ltp'],
                        rprices['inplay_matched'],
                    )
                )


    #loading selection file and parsing dates
    selection = pd.read_csv(selection_meta, dtype={'market_id': object, 'selection_id': object}, parse_dates = ['event_date'])

    #loading price file and parsing dates
    prices = pd.read_csv(
        prices_path, 
        quoting=csv.QUOTE_ALL,
        dtype={'market_id': 'string', 'selection_id': 'string', 'atb_ladder': 'string', 'atl_ladder': 'string'},
        parse_dates=['time']
    )

    #creating the ladder as a dictionary
    prices['atb_ladder'] = [ast.literal_eval(x) for x in prices['atb_ladder']]
    prices['atl_ladder'] = [ast.literal_eval(x) for x in prices['atl_ladder']]

    #merging the price and selection files
    df = selection.merge(prices, on = ['market_id', 'selection_id'])
    #assigning best prices available and calculating time relative to market start time
    df = (
        df
        .assign(back_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atb_ladder']])
        .assign(lay_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atl_ladder']])
        .assign(seconds_before_scheduled_off = lambda x: round((x['event_date'] - x['time']).dt.total_seconds()))
        .query('seconds_before_scheduled_off < @log1_Start')
    )

    #creating a unique list of market ids
    marketids = df['market_id'].unique().tolist()

    #writing each market to its own csv file
    for market in marketids:
        #create a dataframe and a naming convention for this market
        pricing_data=df[(df['market_id']==market)]
        if pricing_data.empty:
            continue
        race_track=pricing_data['track'].iloc[0]
        market_name=pricing_data['event_name'].iloc[0]
        market_time=pricing_data['event_date'].iloc[0]
        off=market_time.strftime('%Y-%m-%d')
        #write race details to the dataframe
        pricing_data['race']=pricing_data['event_name'].str.split('R').str[1]
        pricing_data['race']=pricing_data['race'].str.split(' ').str[0]
        pricing_data['distance']=pricing_data['event_name'].str.split(' ').str[1]
        pricing_data['distance']=pricing_data['distance'].str.split('m').str[0]
        pricing_data['race_type']=pricing_data['event_name'].str.split('m ').str[1]
        pricing_data['selection_name']=pricing_data['selection_name'].str.split('\. ').str[1]
        #convert GMT timezone to AEST/AEDT
        pricing_data['event_date']=pricing_data['event_date'].astype('datetime64[ns]')
        pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize('UTC',ambiguous=False)
        pricing_data['event_date']=pricing_data['event_date'].dt.tz_convert('Australia/Melbourne')
        pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize(None)
        pricing_data['time']=pricing_data['time'].astype('datetime64[ns]')
        pricing_data['time']=pricing_data['time'].dt.tz_localize('UTC',ambiguous=False)
        pricing_data['time']=pricing_data['time'].dt.tz_convert('Australia/Melbourne')
        pricing_data['time']=pricing_data['time'].dt.tz_localize(None)
        #covert GBP to AUD
        event_date=(pd.to_datetime(pricing_data['event_date']).dt.date).iloc[0]
        conversion_rate=CurrencyConverter(fallback_on_missing_rate=True).convert(1,'GBP','AUD',date=event_date)
        pricing_data['traded_volume']=pricing_data['traded_volume']*conversion_rate
        pricing_data.loc[(pricing_data['traded_volume'] < 0), 'traded_volume'] = 0
        pricing_data['traded_volume'] = pricing_data['traded_volume'].round(decimals=2)
        #reorder the dataframe and write to csv
        pricing_data=pricing_data[['event_date','country','track','race','distance','race_type','market_id','selection_id','selection_name',"selection_status",'reduction_factor','result','bsp','time','traded_volume','wap','ltp','atb_ladder','atl_ladder','back_best','lay_best','seconds_before_scheduled_off','sp_near','sp_far','pp_min','pp_max','pp_wap','pp_ltp','pp_volume','bsp_volume','ip_min','ip_max','ip_wap','ip_ltp','ip_volume']]
        pricing_data.to_csv(file_directory+off+' - '+race_track+' - '+market_name+'.csv',index=False)


#removing intermediate working documents to clean up
os.remove(selection_meta)
os.remove(prices_path)

2.2 Complete Code Sports Processor

import pandas as pd
import numpy as np
import os
import csv
import csv
import tarfile
import zipfile
import bz2
import glob
import ast
from unittest.mock import patch
import betfairlightweight
from betfairlightweight import StreamListener
from betfair_data import bflw #"Import "betfair_data.bflw" could not be resolved from source" - This is a known issue, the script should still run
import pandas as pd
from currency_converter import CurrencyConverter
from pandas.errors import SettingWithCopyWarning
import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)

file_directory = ''# INSERT FILE DIRECTORY WHERE TAR FILES ARE STORED

log1_Start = 60 * 60  # seconds before scheduled off to start recording data for data segment one
log1_Step = 60  # seconds between log steps for first data segment
log2_Start = 60 * 10  # seconds before scheduled off to start recording data for data segment two
log2_Step = 10  # seconds between log steps for second data segment


def filter_market(market: bflw.MarketBook) -> bool:
    d = market.market_definition
    return (
        d is not None
        # and d.country_code in ['ES']
        # and d.market_type == 'MATCH_ODDS'
        and d.name in ['Match Odds', '1st Innings 20 Overs Line']
        # and d.betting_type == 'ODDS'
    )

# Simply add the below variable name to the market filter function above with the filter value
# Equals (== 'Value in Quotation' or True/False/None), Does Not Equal (!= 'Value in Quotation' or True/False/None) - FOR ALL TYPES
# Greater than (>), Greater than or equal to (>=), Less than (<), Less than or equal to (<=) - FOR INT/FLOAT
# For list of value 'in'

# and d.betting_type: str - ODDS, ASIAN_HANDICAP_SINGLES, ASIAN_HANDICAP_DOUBLES or LINE
# and d.bsp_market: bool - True, False
# and d.country_code: str - list of codes can be found here: https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 - Default value is 'GB' - Australia = 'AU', New Zealand = 'NZ'
# and d.event_id: str - PARENT_EVENT_ID
# and d.event_name: Optional[str] - Usually the name of the Match-Up (e.g. Bangladesh v Sri Lanka) or Race Name (e.g. R6 1400m Grp1) - Note: Dictionaries don't support wildcard searches
# and d.event_type_id: str - SportID [Soccer - 1, Tennis - 2, Golf - 3, Cricket - 4, AFL - 61420]
# and d.market_base_rate: float - Market Commission Rate
# and d.market_type: str - e.g. "MATCH_ODDS", "1ST_INNINGS_RUNS","TO_QUALIFY" - always all caps with "_" replacing spaces
# and d.name: Optional[str] - market name (e.g. Sri Lanka 1st Inns Runs)
# and d.number_of_active_runners: int - Head-To-Heads markets will be 2
# and d.number_of_winners: int - Odds Markets usually 1, Line/Handicap markets usually 0
# and d.regulators: str - 'MR_INT' to remove ring-fenced exchange markets 
# and d.turn_in_play_enabled: bool - True, False


trading = betfairlightweight.APIClient(username = "username", password = "password", app_key="app_key")
listener = StreamListener(max_latency=None)
stream_files = glob.glob(file_directory+"*.tar") 
selection_meta = file_directory+"metadata.csv"
prices_path =  file_directory+"preplay.csv"

# rounding to 2 decimal places or returning '' if blank
def as_str(v) -> str:
    return '%.2f' % v if (type(v) is float) or (type(v) is int) else v if type(v) is str else ''

# returning smaller of two numbers where min not 0
def min_gr0(a: float, b: float) -> float:
    if a <= 0:
        return b
    if b <= 0:
        return a

    return min(a, b)

def load_markets(file_paths):
    for file_path in file_paths:
        print(file_path)
        print("__ Parsing Detailed Prices ___ ")
        if os.path.isdir(file_path):
            for path in glob.iglob(file_path + '**/**/*.bz2', recursive=True):
                f = bz2.BZ2File(path, 'rb')
                yield f
                f.close()
        elif os.path.isfile(file_path):
            ext = os.path.splitext(file_path)[1]
            # iterate through a tar archive
            if ext == '.tar':
                with tarfile.TarFile(file_path) as archive:
                    for file in archive:
                        yield bz2.open(archive.extractfile(file))
            # or a zip archive
            elif ext == '.zip':
                with zipfile.ZipFile(file_path) as archive:
                    for file in archive.namelist():
                        yield bz2.open(archive.open(file))

    return None

def slicePrice(l, n):
    try:
        x = l[n].price
    except:
        x = ""
    return(x)

def sliceSize(l, n):
    try:
        x = l[n].size
    except:
        x = ""
    return(x)

def pull_ladder(availableLadder, n = 3):
        out = {}
        price = []
        volume = []
        if len(availableLadder) == 0:
            return(out)        
        else:
            for rung in availableLadder[0:n]:
                price.append(rung.price)
                volume.append(rung.size)

            out["p"] = price
            out["v"] = volume
            return(out)

def final_market_book(s):

    with patch("builtins.open", lambda f, _: f):

        gen = s.get_generator()

        for market_books in gen():

            # Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++

            if ((evaluate_market := filter_market(market_books[0])) == False):
                    return(None)

            for market_book in market_books:

                last_market_book = market_book

        return(last_market_book)

def loop_prices(s, o):

    with patch("builtins.open", lambda f, _: f):

        gen = s.get_generator()

        marketID = None
        tradeVols = None
        time = None

        for market_books in gen():

            # Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++

            if ((evaluate_market := filter_market(market_books[0])) == False):
                    break

            for market_book in market_books:

                # Time Step Management ++++++++++++++++++++++++++++++++++

                if marketID is None:

                    # No market initialised
                    marketID = market_book.market_id
                    time =  market_book.publish_time

                elif market_book.status == "CLOSED":

                    # Stop once market settled
                    break

                else:

                    seconds_to_start = (market_book.market_definition.market_time - market_book.publish_time).total_seconds()

                    if seconds_to_start > log1_Start:

                        # Too early before off to start logging prices
                        continue

                    else:

                        # Update data at different time steps depending on seconds to off
                        wait = np.where(seconds_to_start <= log2_Start, log2_Step, log1_Step)

                        # New Market
                        if market_book.market_id != marketID:
                            marketID = market_book.market_id
                            time =  market_book.publish_time
                        # (wait) seconds elapsed since last write
                        elif (market_book.publish_time - time).total_seconds() > wait:
                            time = market_book.publish_time
                        # fewer than (wait) seconds elapsed continue to next loop
                        else:
                            continue

                # Execute Data Logging ++++++++++++++++++++++++++++++++++

                for runner in market_book.runners:

                    try:
                        atb_ladder = pull_ladder(runner.ex.available_to_back, n = 3)
                        atl_ladder = pull_ladder(runner.ex.available_to_lay, n = 3)
                    except:
                        atb_ladder = {}
                        atl_ladder = {}

                    # Calculate Current Traded Volume + Traded WAP
                    limitTradedVol = sum([rung.size for rung in runner.ex.traded_volume])
                    if limitTradedVol == 0:
                        limitWAP = ""
                    else:
                        limitWAP = sum([rung.size * rung.price for rung in runner.ex.traded_volume]) / limitTradedVol
                        limitWAP = round(limitWAP, 2)

                    #Use this section to write rows that are required to join the metadata OR that will change in time
                    o.writerow(
                        (
                            market_book.market_id,
                            runner.selection_id,
                            market_book.publish_time,
                            market_book.inplay,
                            limitTradedVol,
                            limitWAP,
                            runner.last_price_traded or "",
                            str(atb_ladder).replace(' ',''), 
                            str(atl_ladder).replace(' ','')
                        )
                    )


def parse_prices(dir, out_file):

    with open(out_file, "w+") as output:

        writer = csv.writer(
            output, 
            delimiter=',',
            lineterminator='\r\n',
            quoting=csv.QUOTE_ALL
        )

        writer.writerow(("market_id","selection_id","time","inplay","traded_volume","wap","ltp","atb_ladder","atl_ladder"))

        for file_obj in load_markets(dir):

            stream = trading.streaming.create_historical_generator_stream(
                file_path=file_obj,
                listener=listener,
            )

            loop_prices(stream, writer)

#loop over each TAR file
for tar in stream_files:
    parse_prices([tar], prices_path)
    print("__ Parsing Market and Selection Data ___ ")

    # record prices to a file
    with open(selection_meta, "w") as output:
        # defining column headers
        output.write("market_id,market_time,market_type,event_name,market_name,selection_id,x,selection_name,y,result\n")
        #loop over each market in the TAR file
        for i, g in enumerate(bflw.Files([tar])):
            print("Market {}".format(i), end='\r')

            def get_pre_post_final():
                eval_market = None
                prev_market = None
                preplay_market = None
                postplay_market = None       

                for market_books in g:
                    for market_book in market_books:
                        # if market doesn't meet filter return out
                        if eval_market is None and ((eval_market := filter_market(market_book)) == False):
                            return (None, None, None)

                        # final market view before market goes in play
                        if prev_market is not None and prev_market.inplay != market_book.inplay:
                            preplay_market = prev_market

                        # final market view at the conclusion of the market
                        if prev_market is not None and prev_market.status == "OPEN" and market_book.status != prev_market.status:
                            postplay_market = market_book

                        # update reference to previous market
                        prev_market = market_book

                return (preplay_market, postplay_market, prev_market) # prev is now final

            (preplay_market, postplay_market, final_market) = get_pre_post_final()

            # no price data for market
            if postplay_market is None:
                continue; 

            preplay_traded = [ (r.last_price_traded, r.ex.traded_volume) for r in preplay_market.runners ] if preplay_market is not None else None

            postplay_traded = [ (
                r.last_price_traded,
                r.ex.traded_volume,
                # calculating SP traded vol as smaller of back_stake_taken or (lay_liability_taken / (BSP - 1))        
                min_gr0(
                    next((pv.size for pv in r.sp.back_stake_taken if pv.size > 0), 0),
                    next((pv.size for pv in r.sp.lay_liability_taken if pv.size > 0), 0)  / ((r.sp.actual_sp if (type(r.sp.actual_sp) is float) or (type(r.sp.actual_sp) is int) else 0) - 1)
                ) if r.sp.actual_sp is not None else 0,
            ) for r in postplay_market.runners ]

            # generic selection data
            for r in final_market.runners:
                selection_id=r.selection_id,
                selection_name=next((rd.name for rd in final_market.market_definition.runners if rd.selection_id == r.selection_id), None),
                selection_status=r.status

            # printing to csv for each selection
                output.write(
                    "{},{},{},{},{},{},{},{}\n".format(
                        postplay_market.market_id,
                        postplay_market.market_definition.market_time,
                        postplay_market.market_definition.market_type,
                        postplay_market.market_definition.event_name,
                        postplay_market.market_definition.name,
                        selection_id,
                        selection_name.encode('utf-8'),
                        selection_status
                    )
                )

    #loading selection file, parsing dates and cleaning the table
    # loading selection file, parsing dates and cleaning the table
    selection = pd.read_csv(
        selection_meta, dtype={'market_id': object, 'selection_id': object}, parse_dates=['market_time']
    )
    selection.set_axis(
        [
            'market_id',
            'market_time',
            'market_type',
            'event_name',
            'market_name',
            'selection_id',
            'x',
            'selection_name',
            'y',
            'result'
        ],
        axis=1
    )

    selection = selection[['market_id','market_time','market_type','event_name','market_name','selection_id','selection_name','result']]
    selection['selection_id'] = selection['selection_id'].str.split('\(').str[1]
    selection['selection_name'] = selection['selection_name'].str.split("\('").str[1]
    selection['selection_name'] = selection['selection_name'].str.split("'").str[0]

    # loading price file and parsing dates
    prices = pd.read_csv(
        prices_path,
        quoting=csv.QUOTE_ALL,
        dtype={'market_id': 'string', 'selection_id': 'string', 'atb_ladder': 'string', 'atl_ladder': 'string'},
        parse_dates=['time']
    )

    # creating the ladder as a dictionary
    prices['atb_ladder'] = [ast.literal_eval(x) for x in prices['atb_ladder']]
    prices['atl_ladder'] = [ast.literal_eval(x) for x in prices['atl_ladder']]

    # merging the price and selection files
    df = selection.merge(prices, on=['market_id', 'selection_id'])

    # assigning best prices available and calculating time relative to market start time
    df = (
        df
        .assign(back_best=lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atb_ladder']])
        .assign(lay_best=lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atl_ladder']])
        .assign(
            seconds_before_scheduled_off=lambda x: round((x['market_time'] - x['time']).dt.total_seconds())
        )
        .query('seconds_before_scheduled_off < @log1_start')
    )

    # Writing each processed market to its own csv file
    market_ids = df['market_id'].unique().tolist()
    for market in market_ids:
        # Create a dataframe and a naming convention for this market
        pricing_data = df[df['market_id'] == market]
        fixture = pricing_data['event_name'].iloc[0].replace(r"/","")
        market_name = pricing_data['market_name'].iloc[0]
        market_time = pricing_data['market_time'].iloc[0]
        off = market_time.strftime('%Y-%m-%d')
        # Convert GBP to AUD
        event_date = (pd.to_datetime(pricing_data['market_time']).dt.date).iloc[0]
        conversion_rate = CurrencyConverter(fallback_on_missing_rate=True).convert(1, 'GBP', 'AUD', date=event_date)
        pricing_data['traded_volume'] = pricing_data['traded_volume'] * conversion_rate
        pricing_data.loc[pricing_data['traded_volume'] < 0, 'traded_volume'] = 0
        pricing_data['traded_volume'] = pricing_data['traded_volume'].round(decimals=2)
        pricing_data.to_csv(file_directory + off + ' - ' + fixture + ' - ' + market_name + '.csv', index=False)

#removing intermediate working documents to clean up
os.remove(selection_meta)
os.remove(prices_path)

Disclaimer

Note that whilst models and automated strategies are fun and rewarding to create, we can't promise that your model or betting strategy will be profitable, and we make no representations in relation to the code shared or information on this page. If you're using this code or implementing your own strategies, you do so entirely at your own risk and you are responsible for any winnings/losses incurred. Under no circumstances will Betfair be liable for any loss or damage you suffer.