Take away the pain! Processing TAR Files 101
So even though we've got some great tutorials up here on our Automation Hub, a common wish from many an automated customer, is for a simple plug-and-play tutorial and script for parsing the PRO TAR files into CSVs. So here it is, an easy-to-follow tutorial that makes use of existing tutorials by Tom Bishop and others to easily and painlessly process these JSON files into CSVs ready to be modelled.
And don't worry, this tutorial has plenty of error-handling built in so (hopefully) you won't wake up in the morning to find the script failed at 1AM while you were dreaming of the final straight at Moonee Valley.
This tutorial will concern itself primarily with racing, however, at the bottom of the page we have also included complete code suited for parsing sports TAR files including handling for line/handicap markets.
So without further ado, let's jump into it!
Cheat sheet
- This is presented as a Jupyter notebook as this format is interactive and lets you run snippets of code from within the notebook. To use this functionality you'll need to download a copy of the
ipynb
file locally and open it in a text editor (i.e. VS code). - If you're looking for the complete code head to the bottom of the page or download the script from Github.
- To run the code, save it to your machine, open a command prompt, or a terminal in your text editor of choice (we're using VS code), make sure you've navigated in the terminal to the folder you've saved the script in and then type
py main.py
(or whatever you've called your script file if not main) then hit enter. To stop the code running use Ctrl C. - Make sure you amend your data path to point to your data file. We'll be taking in an input of a historical tar file downloaded from the Betfair historic data site. We're using a PRO version, though the code should work on ADVANCED too. This approach won't work with the BASIC data tier. For more information on these files please reach out to us at automation
- We're using the betfairlightweight and betfair_data package to do the heavy lifting
- We've also posted the completed code logic on the
betfair-downunder
Github repo.
1.0 Setup
1.1 Importing libraries
Once again I'll be presenting the analysis in a jupyter notebook and will be using python as a programming language.
You'll need betfairlightweight
and betfair_data
which you can install with something like pip install betfairlightweight
.
NOTE: Import 'betfair_data.bflw' could not be resolved from source - This is a known issue, the script should still run
import pandas as pd
import numpy as np
import os
import csv
import csv
import tarfile
import zipfile
import bz2
import glob
import ast
from unittest.mock import patch
import betfairlightweight
from betfairlightweight import StreamListener
from betfair_data import bflw
import pandas as pd
from betfair_data import PriceSize
import functools
from typing import List, Tuple
from pandas.errors import SettingWithCopyWarning
import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
from itertools import zip_longest
from currency_converter import CurrencyConverter
1.2 Picking our intervals
Here we'll define our working folder and the timestamps for which we want to pull all of our data. We've set the script to pull prices from 10 minutes out from the jump at 30 second intervals and then at 5 second intervals from 1 minute out. Feel free to change as you need.
PRO TIP: copying your file path from windows explorer works really well as long as you replace the back-slashes with forward-slashes
file_directory= '' #INSERT FILE DIRECTORY WHERE TAR FILES ARE STORED
log1_Start = 60 * 10 # Seconds before scheduled off to start recording data for data segment one
log1_Step = 30 # Seconds between log steps for first data segment
log2_Start = 60 * 1 # Seconds before scheduled off to start recording data for data segment two
log2_Step = 5 # Seconds between log steps for second data segment
1.3 Filtering the markets
We'll then define a market filter as well as a way to filter out the harness racing markets. The different filter variables are listed below and can be added or removed as required. Some of these aren't that useful but have been listed for the sake of completeness.
# splitting race name and returning the parts
def split_anz_horse_market_name(market_name: str) -> Tuple[str, int, str]:
# return race no, length, race type
# input samples:
# 'R6 1400m Grp1' -> ('R6','1400m','grp1')
# 'R1 1609m Trot M' -> ('R1', '1609m', 'trot')
# 'R4 1660m Pace M' -> ('R4', '1660m', 'pace')
parts = market_name.split(' ')
race_no = parts[0]
race_len = parts[1].split('m')
race_len = race_len[0]
race_type = parts[2].lower()
return (race_no, race_len, race_type)
# filtering markets to those that fit the following criteria
def filter_market(market: bflw.MarketBook) -> bool:
d = market.market_definition
return (d != None
and d.country_code == 'AU'
and d.market_type == 'WIN'
and (c := split_anz_horse_market_name(d.name)[2]) != 'trot' and c != 'pace' #strips out Harness Races
#and (c := split_anz_horse_market_name(d.name)[2]) == 'hcap'#can use this to filter by race type
#and (c := split_anz_horse_market_name(d.name)[1]) >= '1200' #can use this to filter by race length
)
# Simply add the below variable name to the market filter function above with the filter value
# Equals (== 'Value in Quotation' or True/False/None), Does Not Equal (!= 'Value in Quotation' or True/False/None) - FOR ALL TYPES
# Greater than (>), Greater than or equal to (>=), Less than (<), Less than or equal to (<=) - FOR INT/FLOAT
# For list of value 'in'
# and d.betting_type: str - ODDS, ASIAN_HANDICAP_SINGLES, ASIAN_HANDICAP_DOUBLES or LINE
# and d.bsp_market: bool - True, False
# and d.country_code: str - list of codes can be found here: https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 - Default value is 'GB' - Australia = 'AU', New Zealand = 'NZ'
# and d.event_id: str - PARENT_EVENT_ID
# and d.event_name: Optional[str] - Usually the name of the Match-Up (e.g. Bangladesh v Sri Lanka) or Race Meeting Name (e.g. Wangaratta (AUS) 1st Dec) - Note: Dictionaries don't support wildcard searches
# and d.event_type_id: str - SportID [Horse Racing - 7, Greyhounds - 4339]
# and d.market_base_rate: float - Market Commission Rate
# and d.market_type: str - e.g. "WIN"
# and d.name: Optional[str] - market name (e.g. R1 1170m Mdn)
# and d.number_of_active_runners: int - number of horses/dogs in the race
# and d.number_of_winners: int - Win market 1, Place markets 2+
# and d.turn_in_play_enabled: bool - True, False
# and d.venue: Optional[str] - Racing Only - Track
1.4 Utility Functions
Here we will define our trading and listener functions which will be able to handle the json file format as well as defining some more working directories. We've also defined a few functions to generate the required output.
NOTE: Input credentials are not required here
trading = betfairlightweight.APIClient(username = "username", password = "password", app_key="app_key")
listener = StreamListener(max_latency=None)
stream_files = glob.glob(file_directory+"*.tar")
selection_meta = file_directory+"metadata.csv"
prices_path = file_directory+"preplay.csv"
# rounding to 2 decimal places or returning '' if blank
def as_str(v) -> str:
return '%.2f' % v if (type(v) is float) or (type(v) is int) else v if type(v) is str else ''
# returning smaller of two numbers where min not 0
def min_gr0(a: float, b: float) -> float:
if a <= 0:
return b
if b <= 0:
return a
return min(a, b)
# parsing price data and pulling out weighted avg price, matched, min price and max price
def parse_traded(traded: List[PriceSize]) -> Tuple[float, float, float, float]:
if len(traded) == 0:
return (None, None, None, None)
(wavg_sum, matched, min_price, max_price) = functools.reduce(
lambda total, ps: (
total[0] + (ps.price * ps.size), # wavg_sum before we divide by total matched
total[1] + ps.size, # total matched
min(total[2], ps.price), # min price matched
max(total[3], ps.price), # max price matched
),
traded,
(0, 0, 1001, 0) # starting default values
)
wavg_sum = (wavg_sum / matched) if matched > 0 else None # dividing sum of wavg by total matched
matched = matched if matched > 0 else None
min_price = min_price if min_price != 1001 else None
max_price = max_price if max_price != 0 else None
return (wavg_sum, matched, min_price, max_price)
def load_markets(file_paths):
for file_path in file_paths:
print(file_path)
print("__ Parsing Detailed Prices ___ ")
if os.path.isdir(file_path):
for path in glob.iglob(file_path + '**/**/*.bz2', recursive=True):
f = bz2.BZ2File(path, 'rb')
yield f
f.close()
elif os.path.isfile(file_path):
ext = os.path.splitext(file_path)[1]
# iterate through a tar archive
if ext == '.tar':
with tarfile.TarFile(file_path) as archive:
for file in archive:
yield bz2.open(archive.extractfile(file))
# or a zip archive
elif ext == '.zip':
with zipfile.ZipFile(file_path) as archive:
for file in archive.namelist():
yield bz2.open(archive.open(file))
return None
def slicePrice(l, n):
try:
x = l[n].price
except:
x = ""
return(x)
def sliceSize(l, n):
try:
x = l[n].size
except:
x = ""
return(x)
def pull_ladder(availableLadder, n = 5):
out = {}
price = []
volume = []
if len(availableLadder) == 0:
return(out)
else:
for rung in availableLadder[0:n]:
price.append(rung.price)
volume.append(rung.size)
out["p"] = price
out["v"] = volume
return(out)
def final_market_book(s):
with patch("builtins.open", lambda f, _: f):
gen = s.get_generator()
for market_books in gen():
# Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++
if ((evaluate_market := filter_market(market_books[0])) == False):
return(None)
for market_book in market_books:
last_market_book = market_book
return(last_market_book)
1.5 Parsing Timestamped Prices
Next up are the functions that enable us to read the timestamped prices from the json files and output them ready to be joined on the metadata. This will provide:
- Market ID
- Number of Active Runners in the market (this can help in the event of late scratchings)
- Timestamp (will help us later to calculate time relative to scheduled off time)
- Traded Volume on the selection so far
- Weighted Average Price
- Last Traded Price
- Selection Status (this can help in the event of late scratchings)
- Reduction Factor (effect on the market if the runner were to be scratched)
- Back Ladder (pulls 5 best prices and volume available)
- Lay Ladder (pulls 5 best prices and volume available)
- BSP Near Price
- BSP Far Price
NOTE: Projected BSP displayed on the app/website is the Near Price - more information about Near/Far Prices and their accuracy can be found here
NOTE: The function is defined to stop pulling the prices once the market goes in-play. If in-play prices are required, then simply remove the condition for market_book.inplay. However, you will need to insert an inplay flag in order to distinguish when the market goes inplay in the csv file.
def loop_prices(s, o):
with patch("builtins.open", lambda f, _: f):
gen = s.get_generator()
marketID = None
tradeVols = None
time = None
for market_books in gen():
# Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++
if ((evaluate_market := filter_market(market_books[0])) == False):
break
for market_book in market_books:
# Time Step Management ++++++++++++++++++++++++++++++++++
if marketID is None:
# No market initialised
marketID = market_book.market_id
time = market_book.publish_time
elif market_book.inplay:
# Stop once market inplay
break
else:
seconds_to_start = (market_book.market_definition.market_time - market_book.publish_time).total_seconds()
if seconds_to_start > log1_Start:
# Too early before off to start logging prices
continue
else:
# Update data at different time steps depending on seconds to off
wait = np.where(seconds_to_start <= log2_Start, log2_Step, log1_Step)
# New Market
if market_book.market_id != marketID:
marketID = market_book.market_id
time = market_book.publish_time
# (wait) seconds elapsed since last write
elif (market_book.publish_time - time).total_seconds() > wait:
time = market_book.publish_time
# fewer than (wait) seconds elapsed continue to next loop
else:
continue
# Execute Data Logging ++++++++++++++++++++++++++++++++++
for runner in market_book.runners:
try:
selection_status = runner.status
reduction_factor = runner.adjustment_factor
atb_ladder = pull_ladder(runner.ex.available_to_back, n = 5)
atl_ladder = pull_ladder(runner.ex.available_to_lay, n = 5)
spn = runner.sp.near_price
spf = runner.sp.far_price
except:
selection_status = None
reduction_factor = None
atb_ladder = {}
atl_ladder = {}
spn = None
spf = None
# Calculate Current Traded Volume + Traded WAP
limitTradedVol = sum([rung.size for rung in runner.ex.traded_volume])
if limitTradedVol == 0:
limitWAP = ""
else:
limitWAP = sum([rung.size * rung.price for rung in runner.ex.traded_volume]) / limitTradedVol
limitWAP = round(limitWAP, 2)
o.writerow(
(
market_book.market_id,
market_book.number_of_active_runners,
runner.selection_id,
market_book.publish_time,
limitTradedVol,
limitWAP,
runner.last_price_traded or "",
selection_status,
reduction_factor,
str(atb_ladder).replace(' ',''),
str(atl_ladder).replace(' ',''),
str(spn),
str(spf)
)
)
def parse_prices(dir, out_file):
with open(out_file, "w+") as output:
writer = csv.writer(
output,
delimiter=',',
lineterminator='\r\n',
quoting=csv.QUOTE_ALL
)
writer.writerow(("market_id","active_runners","selection_id","time","traded_volume","wap","ltp","selection_status",'reduction_factor',"atb_ladder","atl_ladder","sp_near","sp_far"))
for file_obj in load_markets(dir):
stream = trading.streaming.create_historical_generator_stream(
file_path=file_obj,
listener=listener,
)
loop_prices(stream, writer)
1.6 The Good Stuff
Now comes the really meaty chunk of code that will do the grunt work for us. We'll generate the timestamped prices for each TAR file separately and then loop over them again to generate the selection metadata. (This is using a chunk of code from our JSON to CSV Revisited tutorial using the Betfair_Data package. This is super speedy because its mainly written in Rust rather than python) In our metadata files we will provide:
- Market ID
- Market Time
- Country Code
- Track/Venue
- Market Name
- Selection ID
- Horse Name
- Result
- BSP
- Preplay Minimum Matched Price
- Preplay Maximum Matched Price
- Preplay Weighted Average Matched Price
- Preplay Last Traded Price
- Preplay Matched Volume (not including SP volume)
- BSP Matched Volume (Back Stake)
- Inplay Minimum Matched Price
- Inplay Maximum Matched Price
- Inplay Weighted Average Matched Price
- Inplay Last Traded Price
- Inplay Matched Volume
Note: For markets that don't go in play (like Greyhounds/Place markets), the 'Inplay' fields will be empty.
Following this metadata generation, we'll then join the dataframes together and add the "seconds to scheduled off" field, as well as converting the GMT time in the TAR Files to Melbourne time (this is useful for when markets are timed for early morning before 10am (or 11am during daylight savings) which causes the local event date to be the previous day in GMT) and converting the currency to Australian Dollars using a historical currency conversion.
The final piece will be outputting the files to CSV. The code is setup to output a separate file for each race which reduces the number of rows per race and enables output to be easily opened and checked for completeness. Changing this to output one CSV file per TAR file or one CSV file per day is fairly straightforward.
#loop over each TAR file
for tar in stream_files:
parse_prices([tar], prices_path) #This is where the timestamped prices are generated
print("__ Parsing Market and Selection Data ___ ")
# record prices to a file
with open(selection_meta, "w") as output:
# defining column headers
output.write("market_id,event_date,country,track,event_name,selection_id,selection_name,result,bsp,pp_min,pp_max,pp_wap,pp_ltp,pp_volume,bsp_volume,ip_min,ip_max,ip_wap,ip_ltp,ip_volume\n")
for i, g in enumerate(bflw.Files([tar])):
print("Market {}".format(i), end='\r')
def get_pre_post_final():
eval_market = None
prev_market = None
preplay_market = None
postplay_market = None
for market_books in g:
for market_book in market_books:
# if market doesn't meet filter return out
if eval_market is None and ((eval_market := filter_market(market_book)) == False):
return (None, None, None)
# final market view before market goes in play
if prev_market is not None and prev_market.inplay != market_book.inplay:
preplay_market = prev_market
# final market view at the conclusion of the market
if prev_market is not None and prev_market.status == "OPEN" and market_book.status != prev_market.status:
postplay_market = market_book
# update reference to previous market
prev_market = market_book
return (preplay_market, postplay_market, prev_market) # prev is now final
(preplay_market, postplay_market, final_market) = get_pre_post_final()
# no price data for market
if postplay_market is None:
continue;
preplay_traded = [ (r.last_price_traded, r.ex.traded_volume) for r in preplay_market.runners ] if preplay_market is not None else None
postplay_traded = [ (
r.last_price_traded,
r.ex.traded_volume,
# calculating SP traded vol as smaller of back_stake_taken or (lay_liability_taken / (bsp - 1))
min_gr0(
next((pv.size for pv in r.sp.back_stake_taken if pv.size > 0), 0),
next((pv.size for pv in r.sp.lay_liability_taken if pv.size > 0), 0) / ((r.sp.actual_sp if (type(r.sp.actual_sp) is float) or (type(r.sp.actual_sp) is int) else 0) - 1)
) if r.sp.actual_sp is not None else 0,
) for r in postplay_market.runners ]
runner_data = [
{
'selection_id': r.selection_id,
'selection_name': next((rd.name for rd in final_market.market_definition.runners if rd.selection_id == r.selection_id), None),
'selection_status': r.status,
'sp': as_str(r.sp.actual_sp),
}
for r in final_market.runners
]
# runner price data for markets that go in play
if preplay_traded is not None:
def runner_vals(r):
(pre_ltp, pre_traded), (post_ltp, post_traded, sp_traded) = r
inplay_only = list(filter(lambda ps: ps.size > 0, [
PriceSize(
price=post_ps.price,
size=post_ps.size - next((pre_ps.size for pre_ps in pre_traded if pre_ps.price == post_ps.price), 0)
)
for post_ps in post_traded
]))
(ip_wavg, ip_matched, ip_min, ip_max) = parse_traded(inplay_only)
(pre_wavg, pre_matched, pre_min, pre_max) = parse_traded(pre_traded)
return {
'preplay_ltp': as_str(pre_ltp),
'preplay_min': as_str(pre_min),
'preplay_max': as_str(pre_max),
'preplay_wavg': as_str(pre_wavg),
'preplay_matched': as_str(pre_matched or 0),
'bsp_matched': as_str(sp_traded or 0),
'inplay_ltp': as_str(post_ltp),
'inplay_min': as_str(ip_min),
'inplay_max': as_str(ip_max),
'inplay_wavg': as_str(ip_wavg),
'inplay_matched': as_str(ip_matched),
}
runner_traded = [ runner_vals(r) for r in zip_longest(preplay_traded, postplay_traded, fillvalue=PriceSize(0, 0)) ]
# runner price data for markets that don't go in play
else:
def runner_vals(r):
(ltp, traded, sp_traded) = r
(wavg, matched, min_price, max_price) = parse_traded(traded)
return {
'preplay_ltp': as_str(ltp),
'preplay_min': as_str(min_price),
'preplay_max': as_str(max_price),
'preplay_wavg': as_str(wavg),
'preplay_matched': as_str(matched or 0),
'bsp_matched': as_str(sp_traded or 0),
'inplay_ltp': '',
'inplay_min': '',
'inplay_max': '',
'inplay_wavg': '',
'inplay_matched': '',
}
runner_traded = [ runner_vals(r) for r in postplay_traded ]
# printing to csv for each runner
for (rdata, rprices) in zip(runner_data, runner_traded):
# defining data to go in each column
output.write(
"{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n".format(
postplay_market.market_id,
postplay_market.market_definition.market_time,
postplay_market.market_definition.country_code,
postplay_market.market_definition.venue,
postplay_market.market_definition.name,
rdata['selection_id'],
rdata['selection_name'],
rdata['selection_status'],
rdata['sp'],
rprices['preplay_min'],
rprices['preplay_max'],
rprices['preplay_wavg'],
rprices['preplay_ltp'],
rprices['preplay_matched'],
rprices['bsp_matched'],
rprices['inplay_min'],
rprices['inplay_max'],
rprices['inplay_wavg'],
rprices['inplay_ltp'],
rprices['inplay_matched'],
)
)
#loading selection file and parsing dates
selection = pd.read_csv(selection_meta, dtype={'market_id': object, 'selection_id': object}, parse_dates = ['event_date'])
#loading price file and parsing dates
prices = pd.read_csv(
prices_path,
quoting=csv.QUOTE_ALL,
dtype={'market_id': 'string', 'selection_id': 'string', 'atb_ladder': 'string', 'atl_ladder': 'string'},
parse_dates=['time']
)
#creating the ladder as a dictionary
prices['atb_ladder'] = [ast.literal_eval(x) for x in prices['atb_ladder']]
prices['atl_ladder'] = [ast.literal_eval(x) for x in prices['atl_ladder']]
#merging the price and selection files
df = selection.merge(prices, on = ['market_id', 'selection_id'])
#assigning best prices available and calculating time relative to market start time
df = (
df
.assign(back_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atb_ladder']])
.assign(lay_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atl_ladder']])
.assign(seconds_before_scheduled_off = lambda x: round((x['event_date'] - x['time']).dt.total_seconds()))
.query('seconds_before_scheduled_off < @log1_Start')
)
#creating a unique list of market ids
marketids = df['market_id'].unique().tolist()
#writing each market to its own csv file
for market in marketids:
#create a dataframe and a naming convention for this market
pricing_data=df[(df['market_id']==market)]
if pricing_data.empty:
continue
race_track=pricing_data['track'].iloc[0]
market_name=pricing_data['event_name'].iloc[0]
market_time=pricing_data['event_date'].iloc[0]
off=market_time.strftime('%Y-%m-%d')
#write race details to the dataframe
pricing_data['race']=pricing_data['event_name'].str.split('R').str[1]
pricing_data['race']=pricing_data['race'].str.split(' ').str[0]
pricing_data['distance']=pricing_data['event_name'].str.split(' ').str[1]
pricing_data['distance']=pricing_data['distance'].str.split('m').str[0]
pricing_data['race_type']=pricing_data['event_name'].str.split('m ').str[1]
pricing_data['selection_name']=pricing_data['selection_name'].str.split('\. ').str[1]
#convert GMT timezone to AEST/AEDT
pricing_data['event_date']=pricing_data['event_date'].astype('datetime64[ns]')
pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize('UTC',ambiguous=False)
pricing_data['event_date']=pricing_data['event_date'].dt.tz_convert('Australia/Melbourne')
pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize(None)
pricing_data['time']=pricing_data['time'].astype('datetime64[ns]')
pricing_data['time']=pricing_data['time'].dt.tz_localize('UTC',ambiguous=False)
pricing_data['time']=pricing_data['time'].dt.tz_convert('Australia/Melbourne')
pricing_data['time']=pricing_data['time'].dt.tz_localize(None)
#covert GBP to AUD
event_date=(pd.to_datetime(pricing_data['event_date']).dt.date).iloc[0]
conversion_rate=CurrencyConverter(fallback_on_missing_rate=True).convert(1,'GBP','AUD',date=event_date)
pricing_data['traded_volume']=pricing_data['traded_volume']*conversion_rate
pricing_data.loc[(pricing_data['traded_volume'] < 0), 'traded_volume'] = 0
pricing_data['traded_volume'] = pricing_data['traded_volume'].round(decimals=2)
#reorder the dataframe and write to csv
pricing_data=pricing_data[['event_date','country','track','race','distance','race_type','market_id','selection_id','selection_name',"selection_status",'reduction_factor','result','bsp','time','traded_volume','wap','ltp','atb_ladder','atl_ladder','back_best','lay_best','seconds_before_scheduled_off','sp_near','sp_far','pp_min','pp_max','pp_wap','pp_ltp','pp_volume','bsp_volume','ip_min','ip_max','ip_wap','ip_ltp','ip_volume']]
pricing_data.to_csv(file_directory+off+' - '+race_track+' - '+market_name+'.csv',index=False)
1.7 Clean-Up
The final piece is just to clean up and remove the intermediate files. If you're encountering errors with the final output, comment this code out so these intermediate files can be viewed.
1.8 Conclusion and Next Steps
If you'd like to expand your datasets, here's a list of other accessible data sources that can add additional datapoints and are fairly simple to join:
- Unsupported API endpoint for a day's racecard -
https://apigateway.betfair.com.au/hub/racecard?date=YYYY-MM-DD
(simply substitute your day at the end) - Unsupported API endpoint for market and runner metadata including Barrier, Trainer, Jockey and Best Tote Price -
https://apigateway.betfair.com.au/hub/raceevent/market_id
(simply substitute your market_id at the end including "1.") - Carrot Cruncher Horse Racing Model (url=
https://betfair-data-supplier-prod.herokuapp.com/api/widgets/kash-ratings-model/datasets?date='+YYYY-MM-DD+'&presenter=RatingsPresenter&csv=true
) (simply substitute your date in the middle). These ratings are available back to 18/2/21. - Iggy Joey Greyhound Model (url=
https://betfair-data-supplier-prod.herokuapp.com/api/widgets/iggy-joey/datasets?date='+YYYY-MM-DD+'&presenter=RatingsPresenter&csv=true
) (simply substitute your date in the middle)
Hopefully this tutorial has helped and provides a great resource to be able to process these powerful, if cumbersome, TAR files and assists in your data gathering!
For further tutorials on how to handle these new CSV files to create a model - check out our suite of other tutorials in the modelling section
2.0 Complete Code
2.1 Complete Code Racing Processor
import pandas as pd
import numpy as np
import os
import csv
import csv
import tarfile
import zipfile
import bz2
import glob
import ast
from unittest.mock import patch
import betfairlightweight
from betfairlightweight import StreamListener
from betfair_data import bflw
import pandas as pd
from betfair_data import PriceSize
import functools
from typing import List, Tuple
from pandas.errors import SettingWithCopyWarning
import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
from itertools import zip_longest
from currency_converter import CurrencyConverter
file_directory= 'C:/Users/motykam/Documents/Thoroughbreds/' #INSERT FILE DIRECTORY WHERE TAR FILES ARE STORED
log1_Start = 60 * 10 # Seconds before scheduled off to start recording data for data segment one
log1_Step = 30 # Seconds between log steps for first data segment
log2_Start = 60 * 1 # Seconds before scheduled off to start recording data for data segment two
log2_Step = 1 # Seconds between log steps for second data segment
# splitting race name and returning the parts
def split_anz_horse_market_name(market_name: str) -> Tuple[str, int, str]:
# return race no, length, race type
# input samples:
# 'R6 1400m Grp1' -> ('R6','1400m','grp1')
# 'R1 1609m Trot M' -> ('R1', '1609m', 'trot')
# 'R4 1660m Pace M' -> ('R4', '1660m', 'pace')
parts = market_name.split(' ')
race_no = parts[0]
race_len = parts[1].split('m')
race_len = race_len[0]
race_type = parts[2].lower()
return (race_no, race_len, race_type)
# filtering markets to those that fit the following criteria
def filter_market(market: bflw.MarketBook) -> bool:
d = market.market_definition
return (d != None
and d.country_code == 'AU'
and d.market_type == 'WIN'
and (c := split_anz_horse_market_name(d.name)[2]) != 'trot' and c != 'pace' #strips out Harness Races
#and (c := split_anz_horse_market_name(d.name)[2]) == 'hcap'
and (c := split_anz_horse_market_name(d.name)[1]) >= '1200'
)
# Simply add the below variable name to the market filter function above with the filter value
# Equals (== 'Value in Quotation' or True/False/None), Does Not Equal (!= 'Value in Quotation' or True/False/None) - FOR ALL TYPES
# Greater than (>), Greater than or equal to (>=), Less than (<), Less than or equal to (<=) - FOR INT/FLOAT
# For list of value 'in'
# and d.betting_type: str - ODDS, ASIAN_HANDICAP_SINGLES, ASIAN_HANDICAP_DOUBLES or LINE
# and d.bsp_market: bool - True, False
# and d.country_code: str - list of codes can be found here: https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 - Default value is 'GB' - Australia = 'AU', New Zealand = 'NZ'
# and d.event_id: str - PARENT_EVENT_ID
# and d.event_name: Optional[str] - Usually the name of the Match-Up (e.g. Bangladesh v Sri Lanka) or Race Meeting Name (e.g. Wangaratta (AUS) 1st Dec) - Note: Dictionaries don't support wildcard searches
# and d.event_type_id: str - SportID [Horse Racing - 7, Greyhounds - 4339]
# and d.market_base_rate: float - Market Commission Rate
# and d.market_type: str - e.g. "WIN"
# and d.name: Optional[str] - market name (e.g. R1 1170m Mdn)
# and d.number_of_active_runners: int - number of horses/dogs in the race
# and d.number_of_winners: int - Win market 1, Place markets 2+
# and d.turn_in_play_enabled: bool - True, False
# and d.venue: Optional[str] - Racing Only - Track
trading = betfairlightweight.APIClient(username = "username", password = "password", app_key="app_key")
listener = StreamListener(max_latency=None)
stream_files = glob.glob(file_directory+"*.tar")
selection_meta = file_directory+"metadata.csv"
prices_path = file_directory+"preplay.csv"
# rounding to 2 decimal places or returning '' if blank
def as_str(v) -> str:
return '%.2f' % v if (type(v) is float) or (type(v) is int) else v if type(v) is str else ''
# returning smaller of two numbers where min not 0
def min_gr0(a: float, b: float) -> float:
if a <= 0:
return b
if b <= 0:
return a
return min(a, b)
# parsing price data and pulling out weighted avg price, matched, min price and max price
def parse_traded(traded: List[PriceSize]) -> Tuple[float, float, float, float]:
if len(traded) == 0:
return (None, None, None, None)
(wavg_sum, matched, min_price, max_price) = functools.reduce(
lambda total, ps: (
total[0] + (ps.price * ps.size), # wavg_sum before we divide by total matched
total[1] + ps.size, # total matched
min(total[2], ps.price), # min price matched
max(total[3], ps.price), # max price matched
),
traded,
(0, 0, 1001, 0) # starting default values
)
wavg_sum = (wavg_sum / matched) if matched > 0 else None # dividing sum of wavg by total matched
matched = matched if matched > 0 else None
min_price = min_price if min_price != 1001 else None
max_price = max_price if max_price != 0 else None
return (wavg_sum, matched, min_price, max_price)
def load_markets(file_paths):
for file_path in file_paths:
print(file_path)
print("__ Parsing Detailed Prices ___ ")
if os.path.isdir(file_path):
for path in glob.iglob(file_path + '**/**/*.bz2', recursive=True):
f = bz2.BZ2File(path, 'rb')
yield f
f.close()
elif os.path.isfile(file_path):
ext = os.path.splitext(file_path)[1]
# iterate through a tar archive
if ext == '.tar':
with tarfile.TarFile(file_path) as archive:
for file in archive:
yield bz2.open(archive.extractfile(file))
# or a zip archive
elif ext == '.zip':
with zipfile.ZipFile(file_path) as archive:
for file in archive.namelist():
yield bz2.open(archive.open(file))
return None
def slicePrice(l, n):
try:
x = l[n].price
except:
x = ""
return(x)
def sliceSize(l, n):
try:
x = l[n].size
except:
x = ""
return(x)
def pull_ladder(availableLadder, n = 5):
out = {}
price = []
volume = []
if len(availableLadder) == 0:
return(out)
else:
for rung in availableLadder[0:n]:
price.append(rung.price)
volume.append(rung.size)
out["p"] = price
out["v"] = volume
return(out)
def final_market_book(s):
with patch("builtins.open", lambda f, _: f):
gen = s.get_generator()
for market_books in gen():
# Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++
if ((evaluate_market := filter_market(market_books[0])) == False):
return(None)
for market_book in market_books:
last_market_book = market_book
return(last_market_book)
def loop_prices(s, o):
with patch("builtins.open", lambda f, _: f):
gen = s.get_generator()
marketID = None
tradeVols = None
time = None
for market_books in gen():
# Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++
if ((evaluate_market := filter_market(market_books[0])) == False):
break
for market_book in market_books:
# Time Step Management ++++++++++++++++++++++++++++++++++
if marketID is None:
# No market initialised
marketID = market_book.market_id
time = market_book.publish_time
elif market_book.inplay:
# Stop once market inplay
break
else:
seconds_to_start = (market_book.market_definition.market_time - market_book.publish_time).total_seconds()
if seconds_to_start > log1_Start:
# Too early before off to start logging prices
continue
else:
# Update data at different time steps depending on seconds to off
wait = np.where(seconds_to_start <= log2_Start, log2_Step, log1_Step)
# New Market
if market_book.market_id != marketID:
marketID = market_book.market_id
time = market_book.publish_time
# (wait) seconds elapsed since last write
elif (market_book.publish_time - time).total_seconds() > wait:
time = market_book.publish_time
# fewer than (wait) seconds elapsed continue to next loop
else:
continue
# Execute Data Logging ++++++++++++++++++++++++++++++++++
for runner in market_book.runners:
try:
selection_status = runner.status
reduction_factor = runner.adjustment_factor
atb_ladder = pull_ladder(runner.ex.available_to_back, n = 5)
atl_ladder = pull_ladder(runner.ex.available_to_lay, n = 5)
spn = runner.sp.near_price
spf = runner.sp.far_price
except:
selection_status = None
reduction_factor = None
atb_ladder = {}
atl_ladder = {}
spn = None
spf = None
# Calculate Current Traded Volume + Traded WAP
limitTradedVol = sum([rung.size for rung in runner.ex.traded_volume])
if limitTradedVol == 0:
limitWAP = ""
else:
limitWAP = sum([rung.size * rung.price for rung in runner.ex.traded_volume]) / limitTradedVol
limitWAP = round(limitWAP, 2)
o.writerow(
(
market_book.market_id,
market_book.number_of_active_runners,
runner.selection_id,
market_book.publish_time,
limitTradedVol,
limitWAP,
runner.last_price_traded or "",
selection_status,
reduction_factor,
str(atb_ladder).replace(' ',''),
str(atl_ladder).replace(' ',''),
str(spn),
str(spf)
)
)
def parse_prices(dir, out_file):
with open(out_file, "w+") as output:
writer = csv.writer(
output,
delimiter=',',
lineterminator='\r\n',
quoting=csv.QUOTE_ALL
)
writer.writerow(("market_id","active_runners","selection_id","time","traded_volume","wap","ltp","selection_status",'reduction_factor',"atb_ladder","atl_ladder","sp_near","sp_far"))
for file_obj in load_markets(dir):
stream = trading.streaming.create_historical_generator_stream(
file_path=file_obj,
listener=listener,
)
loop_prices(stream, writer)
#loop over each TAR file
for tar in stream_files:
parse_prices([tar], prices_path)
print("__ Parsing Market and Selection Data ___ ")
# record prices to a file
with open(selection_meta, "w") as output:
# defining column headers
output.write("market_id,event_date,country,track,event_name,selection_id,selection_name,result,bsp,pp_min,pp_max,pp_wap,pp_ltp,pp_volume,bsp_volume,ip_min,ip_max,ip_wap,ip_ltp,ip_volume\n")
for i, g in enumerate(bflw.Files([tar])):
print("Market {}".format(i), end='\r')
def get_pre_post_final():
eval_market = None
prev_market = None
preplay_market = None
postplay_market = None
for market_books in g:
for market_book in market_books:
# if market doesn't meet filter return out
if eval_market is None and ((eval_market := filter_market(market_book)) == False):
return (None, None, None)
# final market view before market goes in play
if prev_market is not None and prev_market.inplay != market_book.inplay:
preplay_market = prev_market
# final market view at the conclusion of the market
if prev_market is not None and prev_market.status == "OPEN" and market_book.status != prev_market.status:
postplay_market = market_book
# update reference to previous market
prev_market = market_book
return (preplay_market, postplay_market, prev_market) # prev is now final
(preplay_market, postplay_market, final_market) = get_pre_post_final()
# no price data for market
if postplay_market is None:
continue;
preplay_traded = [ (r.last_price_traded, r.ex.traded_volume) for r in preplay_market.runners ] if preplay_market is not None else None
postplay_traded = [ (
r.last_price_traded,
r.ex.traded_volume,
# calculating SP traded vol as smaller of back_stake_taken or (lay_liability_taken / (bsp - 1))
min_gr0(
next((pv.size for pv in r.sp.back_stake_taken if pv.size > 0), 0),
next((pv.size for pv in r.sp.lay_liability_taken if pv.size > 0), 0) / ((r.sp.actual_sp if (type(r.sp.actual_sp) is float) or (type(r.sp.actual_sp) is int) else 0) - 1)
) if r.sp.actual_sp is not None else 0,
) for r in postplay_market.runners ]
runner_data = [
{
'selection_id': r.selection_id,
'selection_name': next((rd.name for rd in final_market.market_definition.runners if rd.selection_id == r.selection_id), None),
'selection_status': r.status,
'sp': as_str(r.sp.actual_sp),
}
for r in final_market.runners
]
# runner price data for markets that go in play
if preplay_traded is not None:
def runner_vals(r):
(pre_ltp, pre_traded), (post_ltp, post_traded, sp_traded) = r
inplay_only = list(filter(lambda ps: ps.size > 0, [
PriceSize(
price=post_ps.price,
size=post_ps.size - next((pre_ps.size for pre_ps in pre_traded if pre_ps.price == post_ps.price), 0)
)
for post_ps in post_traded
]))
(ip_wavg, ip_matched, ip_min, ip_max) = parse_traded(inplay_only)
(pre_wavg, pre_matched, pre_min, pre_max) = parse_traded(pre_traded)
return {
'preplay_ltp': as_str(pre_ltp),
'preplay_min': as_str(pre_min),
'preplay_max': as_str(pre_max),
'preplay_wavg': as_str(pre_wavg),
'preplay_matched': as_str(pre_matched or 0),
'bsp_matched': as_str(sp_traded or 0),
'inplay_ltp': as_str(post_ltp),
'inplay_min': as_str(ip_min),
'inplay_max': as_str(ip_max),
'inplay_wavg': as_str(ip_wavg),
'inplay_matched': as_str(ip_matched),
}
runner_traded = [ runner_vals(r) for r in zip_longest(preplay_traded, postplay_traded, fillvalue=PriceSize(0, 0)) ]
# runner price data for markets that don't go in play
else:
def runner_vals(r):
(ltp, traded, sp_traded) = r
(wavg, matched, min_price, max_price) = parse_traded(traded)
return {
'preplay_ltp': as_str(ltp),
'preplay_min': as_str(min_price),
'preplay_max': as_str(max_price),
'preplay_wavg': as_str(wavg),
'preplay_matched': as_str(matched or 0),
'bsp_matched': as_str(sp_traded or 0),
'inplay_ltp': '',
'inplay_min': '',
'inplay_max': '',
'inplay_wavg': '',
'inplay_matched': '',
}
runner_traded = [ runner_vals(r) for r in postplay_traded ]
# printing to csv for each runner
for (rdata, rprices) in zip(runner_data, runner_traded):
# defining data to go in each column
output.write(
"{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{},{}\n".format(
postplay_market.market_id,
postplay_market.market_definition.market_time,
postplay_market.market_definition.country_code,
postplay_market.market_definition.venue,
postplay_market.market_definition.name,
rdata['selection_id'],
rdata['selection_name'],
rdata['selection_status'],
rdata['sp'],
rprices['preplay_min'],
rprices['preplay_max'],
rprices['preplay_wavg'],
rprices['preplay_ltp'],
rprices['preplay_matched'],
rprices['bsp_matched'],
rprices['inplay_min'],
rprices['inplay_max'],
rprices['inplay_wavg'],
rprices['inplay_ltp'],
rprices['inplay_matched'],
)
)
#loading selection file and parsing dates
selection = pd.read_csv(selection_meta, dtype={'market_id': object, 'selection_id': object}, parse_dates = ['event_date'])
#loading price file and parsing dates
prices = pd.read_csv(
prices_path,
quoting=csv.QUOTE_ALL,
dtype={'market_id': 'string', 'selection_id': 'string', 'atb_ladder': 'string', 'atl_ladder': 'string'},
parse_dates=['time']
)
#creating the ladder as a dictionary
prices['atb_ladder'] = [ast.literal_eval(x) for x in prices['atb_ladder']]
prices['atl_ladder'] = [ast.literal_eval(x) for x in prices['atl_ladder']]
#merging the price and selection files
df = selection.merge(prices, on = ['market_id', 'selection_id'])
#assigning best prices available and calculating time relative to market start time
df = (
df
.assign(back_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atb_ladder']])
.assign(lay_best = lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atl_ladder']])
.assign(seconds_before_scheduled_off = lambda x: round((x['event_date'] - x['time']).dt.total_seconds()))
.query('seconds_before_scheduled_off < @log1_Start')
)
#creating a unique list of market ids
marketids = df['market_id'].unique().tolist()
#writing each market to its own csv file
for market in marketids:
#create a dataframe and a naming convention for this market
pricing_data=df[(df['market_id']==market)]
if pricing_data.empty:
continue
race_track=pricing_data['track'].iloc[0]
market_name=pricing_data['event_name'].iloc[0]
market_time=pricing_data['event_date'].iloc[0]
off=market_time.strftime('%Y-%m-%d')
#write race details to the dataframe
pricing_data['race']=pricing_data['event_name'].str.split('R').str[1]
pricing_data['race']=pricing_data['race'].str.split(' ').str[0]
pricing_data['distance']=pricing_data['event_name'].str.split(' ').str[1]
pricing_data['distance']=pricing_data['distance'].str.split('m').str[0]
pricing_data['race_type']=pricing_data['event_name'].str.split('m ').str[1]
pricing_data['selection_name']=pricing_data['selection_name'].str.split('\. ').str[1]
#convert GMT timezone to AEST/AEDT
pricing_data['event_date']=pricing_data['event_date'].astype('datetime64[ns]')
pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize('UTC',ambiguous=False)
pricing_data['event_date']=pricing_data['event_date'].dt.tz_convert('Australia/Melbourne')
pricing_data['event_date']=pricing_data['event_date'].dt.tz_localize(None)
pricing_data['time']=pricing_data['time'].astype('datetime64[ns]')
pricing_data['time']=pricing_data['time'].dt.tz_localize('UTC',ambiguous=False)
pricing_data['time']=pricing_data['time'].dt.tz_convert('Australia/Melbourne')
pricing_data['time']=pricing_data['time'].dt.tz_localize(None)
#covert GBP to AUD
event_date=(pd.to_datetime(pricing_data['event_date']).dt.date).iloc[0]
conversion_rate=CurrencyConverter(fallback_on_missing_rate=True).convert(1,'GBP','AUD',date=event_date)
pricing_data['traded_volume']=pricing_data['traded_volume']*conversion_rate
pricing_data.loc[(pricing_data['traded_volume'] < 0), 'traded_volume'] = 0
pricing_data['traded_volume'] = pricing_data['traded_volume'].round(decimals=2)
#reorder the dataframe and write to csv
pricing_data=pricing_data[['event_date','country','track','race','distance','race_type','market_id','selection_id','selection_name',"selection_status",'reduction_factor','result','bsp','time','traded_volume','wap','ltp','atb_ladder','atl_ladder','back_best','lay_best','seconds_before_scheduled_off','sp_near','sp_far','pp_min','pp_max','pp_wap','pp_ltp','pp_volume','bsp_volume','ip_min','ip_max','ip_wap','ip_ltp','ip_volume']]
pricing_data.to_csv(file_directory+off+' - '+race_track+' - '+market_name+'.csv',index=False)
#removing intermediate working documents to clean up
os.remove(selection_meta)
os.remove(prices_path)
2.2 Complete Code Sports Processor
import pandas as pd
import numpy as np
import os
import csv
import csv
import tarfile
import zipfile
import bz2
import glob
import ast
from unittest.mock import patch
import betfairlightweight
from betfairlightweight import StreamListener
from betfair_data import bflw #"Import "betfair_data.bflw" could not be resolved from source" - This is a known issue, the script should still run
import pandas as pd
from currency_converter import CurrencyConverter
from pandas.errors import SettingWithCopyWarning
import warnings
warnings.simplefilter(action="ignore", category=SettingWithCopyWarning)
file_directory = ''# INSERT FILE DIRECTORY WHERE TAR FILES ARE STORED
log1_Start = 60 * 60 # seconds before scheduled off to start recording data for data segment one
log1_Step = 60 # seconds between log steps for first data segment
log2_Start = 60 * 10 # seconds before scheduled off to start recording data for data segment two
log2_Step = 10 # seconds between log steps for second data segment
def filter_market(market: bflw.MarketBook) -> bool:
d = market.market_definition
return (
d is not None
# and d.country_code in ['ES']
# and d.market_type == 'MATCH_ODDS'
and d.name in ['Match Odds', '1st Innings 20 Overs Line']
# and d.betting_type == 'ODDS'
)
# Simply add the below variable name to the market filter function above with the filter value
# Equals (== 'Value in Quotation' or True/False/None), Does Not Equal (!= 'Value in Quotation' or True/False/None) - FOR ALL TYPES
# Greater than (>), Greater than or equal to (>=), Less than (<), Less than or equal to (<=) - FOR INT/FLOAT
# For list of value 'in'
# and d.betting_type: str - ODDS, ASIAN_HANDICAP_SINGLES, ASIAN_HANDICAP_DOUBLES or LINE
# and d.bsp_market: bool - True, False
# and d.country_code: str - list of codes can be found here: https://en.wikipedia.org/wiki/ISO_3166-1_alpha-2 - Default value is 'GB' - Australia = 'AU', New Zealand = 'NZ'
# and d.event_id: str - PARENT_EVENT_ID
# and d.event_name: Optional[str] - Usually the name of the Match-Up (e.g. Bangladesh v Sri Lanka) or Race Name (e.g. R6 1400m Grp1) - Note: Dictionaries don't support wildcard searches
# and d.event_type_id: str - SportID [Soccer - 1, Tennis - 2, Golf - 3, Cricket - 4, AFL - 61420]
# and d.market_base_rate: float - Market Commission Rate
# and d.market_type: str - e.g. "MATCH_ODDS", "1ST_INNINGS_RUNS","TO_QUALIFY" - always all caps with "_" replacing spaces
# and d.name: Optional[str] - market name (e.g. Sri Lanka 1st Inns Runs)
# and d.number_of_active_runners: int - Head-To-Heads markets will be 2
# and d.number_of_winners: int - Odds Markets usually 1, Line/Handicap markets usually 0
# and d.regulators: str - 'MR_INT' to remove ring-fenced exchange markets
# and d.turn_in_play_enabled: bool - True, False
trading = betfairlightweight.APIClient(username = "username", password = "password", app_key="app_key")
listener = StreamListener(max_latency=None)
stream_files = glob.glob(file_directory+"*.tar")
selection_meta = file_directory+"metadata.csv"
prices_path = file_directory+"preplay.csv"
# rounding to 2 decimal places or returning '' if blank
def as_str(v) -> str:
return '%.2f' % v if (type(v) is float) or (type(v) is int) else v if type(v) is str else ''
# returning smaller of two numbers where min not 0
def min_gr0(a: float, b: float) -> float:
if a <= 0:
return b
if b <= 0:
return a
return min(a, b)
def load_markets(file_paths):
for file_path in file_paths:
print(file_path)
print("__ Parsing Detailed Prices ___ ")
if os.path.isdir(file_path):
for path in glob.iglob(file_path + '**/**/*.bz2', recursive=True):
f = bz2.BZ2File(path, 'rb')
yield f
f.close()
elif os.path.isfile(file_path):
ext = os.path.splitext(file_path)[1]
# iterate through a tar archive
if ext == '.tar':
with tarfile.TarFile(file_path) as archive:
for file in archive:
yield bz2.open(archive.extractfile(file))
# or a zip archive
elif ext == '.zip':
with zipfile.ZipFile(file_path) as archive:
for file in archive.namelist():
yield bz2.open(archive.open(file))
return None
def slicePrice(l, n):
try:
x = l[n].price
except:
x = ""
return(x)
def sliceSize(l, n):
try:
x = l[n].size
except:
x = ""
return(x)
def pull_ladder(availableLadder, n = 3):
out = {}
price = []
volume = []
if len(availableLadder) == 0:
return(out)
else:
for rung in availableLadder[0:n]:
price.append(rung.price)
volume.append(rung.size)
out["p"] = price
out["v"] = volume
return(out)
def final_market_book(s):
with patch("builtins.open", lambda f, _: f):
gen = s.get_generator()
for market_books in gen():
# Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++
if ((evaluate_market := filter_market(market_books[0])) == False):
return(None)
for market_book in market_books:
last_market_book = market_book
return(last_market_book)
def loop_prices(s, o):
with patch("builtins.open", lambda f, _: f):
gen = s.get_generator()
marketID = None
tradeVols = None
time = None
for market_books in gen():
# Check if this market book meets our market filter ++++++++++++++++++++++++++++++++++
if ((evaluate_market := filter_market(market_books[0])) == False):
break
for market_book in market_books:
# Time Step Management ++++++++++++++++++++++++++++++++++
if marketID is None:
# No market initialised
marketID = market_book.market_id
time = market_book.publish_time
elif market_book.status == "CLOSED":
# Stop once market settled
break
else:
seconds_to_start = (market_book.market_definition.market_time - market_book.publish_time).total_seconds()
if seconds_to_start > log1_Start:
# Too early before off to start logging prices
continue
else:
# Update data at different time steps depending on seconds to off
wait = np.where(seconds_to_start <= log2_Start, log2_Step, log1_Step)
# New Market
if market_book.market_id != marketID:
marketID = market_book.market_id
time = market_book.publish_time
# (wait) seconds elapsed since last write
elif (market_book.publish_time - time).total_seconds() > wait:
time = market_book.publish_time
# fewer than (wait) seconds elapsed continue to next loop
else:
continue
# Execute Data Logging ++++++++++++++++++++++++++++++++++
for runner in market_book.runners:
try:
atb_ladder = pull_ladder(runner.ex.available_to_back, n = 3)
atl_ladder = pull_ladder(runner.ex.available_to_lay, n = 3)
except:
atb_ladder = {}
atl_ladder = {}
# Calculate Current Traded Volume + Traded WAP
limitTradedVol = sum([rung.size for rung in runner.ex.traded_volume])
if limitTradedVol == 0:
limitWAP = ""
else:
limitWAP = sum([rung.size * rung.price for rung in runner.ex.traded_volume]) / limitTradedVol
limitWAP = round(limitWAP, 2)
#Use this section to write rows that are required to join the metadata OR that will change in time
o.writerow(
(
market_book.market_id,
runner.selection_id,
market_book.publish_time,
market_book.inplay,
limitTradedVol,
limitWAP,
runner.last_price_traded or "",
str(atb_ladder).replace(' ',''),
str(atl_ladder).replace(' ','')
)
)
def parse_prices(dir, out_file):
with open(out_file, "w+") as output:
writer = csv.writer(
output,
delimiter=',',
lineterminator='\r\n',
quoting=csv.QUOTE_ALL
)
writer.writerow(("market_id","selection_id","time","inplay","traded_volume","wap","ltp","atb_ladder","atl_ladder"))
for file_obj in load_markets(dir):
stream = trading.streaming.create_historical_generator_stream(
file_path=file_obj,
listener=listener,
)
loop_prices(stream, writer)
#loop over each TAR file
for tar in stream_files:
parse_prices([tar], prices_path)
print("__ Parsing Market and Selection Data ___ ")
# record prices to a file
with open(selection_meta, "w") as output:
# defining column headers
output.write("market_id,market_time,market_type,event_name,market_name,selection_id,x,selection_name,y,result\n")
#loop over each market in the TAR file
for i, g in enumerate(bflw.Files([tar])):
print("Market {}".format(i), end='\r')
def get_pre_post_final():
eval_market = None
prev_market = None
preplay_market = None
postplay_market = None
for market_books in g:
for market_book in market_books:
# if market doesn't meet filter return out
if eval_market is None and ((eval_market := filter_market(market_book)) == False):
return (None, None, None)
# final market view before market goes in play
if prev_market is not None and prev_market.inplay != market_book.inplay:
preplay_market = prev_market
# final market view at the conclusion of the market
if prev_market is not None and prev_market.status == "OPEN" and market_book.status != prev_market.status:
postplay_market = market_book
# update reference to previous market
prev_market = market_book
return (preplay_market, postplay_market, prev_market) # prev is now final
(preplay_market, postplay_market, final_market) = get_pre_post_final()
# no price data for market
if postplay_market is None:
continue;
preplay_traded = [ (r.last_price_traded, r.ex.traded_volume) for r in preplay_market.runners ] if preplay_market is not None else None
postplay_traded = [ (
r.last_price_traded,
r.ex.traded_volume,
# calculating SP traded vol as smaller of back_stake_taken or (lay_liability_taken / (BSP - 1))
min_gr0(
next((pv.size for pv in r.sp.back_stake_taken if pv.size > 0), 0),
next((pv.size for pv in r.sp.lay_liability_taken if pv.size > 0), 0) / ((r.sp.actual_sp if (type(r.sp.actual_sp) is float) or (type(r.sp.actual_sp) is int) else 0) - 1)
) if r.sp.actual_sp is not None else 0,
) for r in postplay_market.runners ]
# generic selection data
for r in final_market.runners:
selection_id=r.selection_id,
selection_name=next((rd.name for rd in final_market.market_definition.runners if rd.selection_id == r.selection_id), None),
selection_status=r.status
# printing to csv for each selection
output.write(
"{},{},{},{},{},{},{},{}\n".format(
postplay_market.market_id,
postplay_market.market_definition.market_time,
postplay_market.market_definition.market_type,
postplay_market.market_definition.event_name,
postplay_market.market_definition.name,
selection_id,
selection_name.encode('utf-8'),
selection_status
)
)
#loading selection file, parsing dates and cleaning the table
# loading selection file, parsing dates and cleaning the table
selection = pd.read_csv(
selection_meta, dtype={'market_id': object, 'selection_id': object}, parse_dates=['market_time']
)
selection.set_axis(
[
'market_id',
'market_time',
'market_type',
'event_name',
'market_name',
'selection_id',
'x',
'selection_name',
'y',
'result'
],
axis=1
)
selection = selection[['market_id','market_time','market_type','event_name','market_name','selection_id','selection_name','result']]
selection['selection_id'] = selection['selection_id'].str.split('\(').str[1]
selection['selection_name'] = selection['selection_name'].str.split("\('").str[1]
selection['selection_name'] = selection['selection_name'].str.split("'").str[0]
# loading price file and parsing dates
prices = pd.read_csv(
prices_path,
quoting=csv.QUOTE_ALL,
dtype={'market_id': 'string', 'selection_id': 'string', 'atb_ladder': 'string', 'atl_ladder': 'string'},
parse_dates=['time']
)
# creating the ladder as a dictionary
prices['atb_ladder'] = [ast.literal_eval(x) for x in prices['atb_ladder']]
prices['atl_ladder'] = [ast.literal_eval(x) for x in prices['atl_ladder']]
# merging the price and selection files
df = selection.merge(prices, on=['market_id', 'selection_id'])
# assigning best prices available and calculating time relative to market start time
df = (
df
.assign(back_best=lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atb_ladder']])
.assign(lay_best=lambda x: [np.nan if d.get('p') is None else d.get('p')[0] for d in x['atl_ladder']])
.assign(
seconds_before_scheduled_off=lambda x: round((x['market_time'] - x['time']).dt.total_seconds())
)
.query('seconds_before_scheduled_off < @log1_start')
)
# Writing each processed market to its own csv file
market_ids = df['market_id'].unique().tolist()
for market in market_ids:
# Create a dataframe and a naming convention for this market
pricing_data = df[df['market_id'] == market]
fixture = pricing_data['event_name'].iloc[0].replace(r"/","")
market_name = pricing_data['market_name'].iloc[0]
market_time = pricing_data['market_time'].iloc[0]
off = market_time.strftime('%Y-%m-%d')
# Convert GBP to AUD
event_date = (pd.to_datetime(pricing_data['market_time']).dt.date).iloc[0]
conversion_rate = CurrencyConverter(fallback_on_missing_rate=True).convert(1, 'GBP', 'AUD', date=event_date)
pricing_data['traded_volume'] = pricing_data['traded_volume'] * conversion_rate
pricing_data.loc[pricing_data['traded_volume'] < 0, 'traded_volume'] = 0
pricing_data['traded_volume'] = pricing_data['traded_volume'].round(decimals=2)
pricing_data.to_csv(file_directory + off + ' - ' + fixture + ' - ' + market_name + '.csv', index=False)
#removing intermediate working documents to clean up
os.remove(selection_meta)
os.remove(prices_path)
Disclaimer
Note that whilst models and automated strategies are fun and rewarding to create, we can't promise that your model or betting strategy will be profitable, and we make no representations in relation to the code shared or information on this page. If you're using this code or implementing your own strategies, you do so entirely at your own risk and you are responsible for any winnings/losses incurred. Under no circumstances will Betfair be liable for any loss or damage you suffer.