class RefinitivEarningsFetcher:
MAX_RICS = 2000
events_url = 'https://api.rkd.refinitiv.com/api/StreetEvents/StreetEvents.svc/REST/StreetEvents_2/GetEventHeadlines_1'
def __init__(self, rconn):
self.rconn = rconn
# Gets the request message for event headlines, converting a start/end datetime object to str
def fetch_events_request_msg(self, curr_page, max_page_records, RICs, start=None, end=None):
start, end = to_string(start, end)
request_msg = {
"GetEventHeadlines_Request_1": {
"DateTimeRange": {
"From": start,
"To": end
},
"Pagination": {
"PageNumber": curr_page,
"RecordsPerPage": max_page_records
},
"ContextCodes": {
"Type": "Symbol",
"Scheme": "RIC",
"Values": {
"Value": RICs
}
},
"EventTypes": {
"EventType": [
"EarningsCallsAndPresentations"
]
},
"ContentFilters": {
"WebcastFilter": [
{
"status": "Available"
}
]
},
"UTCIndicatorInResponse": 'true'
}
}
return request_msg
# Retrieves pages of event headlines between given start and end dates
def do_fetch_events(self, start, end, RICs):
curr_page = 1
max_page_records = 100
events = pd.DataFrame()
# Loop through 'pages' of records until we equal the record count, appending to events
while True:
request_msg = self.fetch_events_request_msg(curr_page, max_page_records, RICs, start, end)
response = self.rconn.sendRequest(self.events_url, request_msg)
response_info = response.json()['GetEventHeadlines_Response_1']
total_records = response_info['PaginationResult']['TotalRecords']
# If no events don't bother appending
if total_records == 0:
break
events_data = response_info['EventHeadlines']['Headline']
events_new = pd.json_normalize(events_data)
events = pd.concat([events, events_new], ignore_index=True)
# Stop when we have exceeded max records
if curr_page * max_page_records >= total_records:
break
curr_page += 1
return events
# Retrieves event headlines between given start and end dates
def fetch_events(self, RICs, start=None, end=None, drop_meta=True, save_path=None, listings_path=None):
events = pd.DataFrame()
start, end = to_datetime(start, end)
print('Fetching Earnings Events from %s to %s' % to_string(start, end))
# loop through RICs in intervals of size MAX_RICS
# loop through 5 year windows from the start date until we have covered the end
for i in range(0, len(RICs), self.MAX_RICS):
RIC_slice = RICs[i : min(len(RICs), i + self.MAX_RICS)]
cstart = start
cend = cstart + relativedelta(years=5)
while True:
events_new = self.do_fetch_events(cstart, cend, RIC_slice)
events = pd.concat([events, events_new], ignore_index=True)
if cend >= end:
break
cstart = cend
cend = min(end, cstart + relativedelta(years=5))
print(f"Fetched {len(events)} Instances of {len(set(events['EventId']))} Unique Events")
# Select transcript if it exists otherwise use webcastid as transcript id
events['TranscriptId'] = events['LiveWebcast.WebcastId']
if 'ReplayWebcast.WebcastId' in events.columns:
events.loc[~events['ReplayWebcast.WebcastId'].isnull(), 'TranscriptId'] = events['ReplayWebcast.WebcastId']
if 'Transcript.TranscriptId' in events.columns:
events.loc[~events['Transcript.TranscriptId'].isnull(), 'TranscriptId'] = events['Transcript.TranscriptId']
# Grab RIC from json
events['RIC'] = events['Organization.Symbols.Symbol'].apply(lambda x: x[0]['Value'])
# if drop_meta then only keep important fields
if drop_meta:
events = events[['EventId', 'Organization.Name','Name', 'TranscriptId', 'LastUpdate', 'Duration.StartDateTime', 'RIC', 'CountryCode']]
# rename for interpretability
events = events.rename(columns={'Organization.Name': 'Company', 'Name': 'EventName', 'LastUpdate':'ReportingDate','Duration.StartDateTime': 'EventTime'})
# join with GICs codes by RIC
if listings_path is not None:
ric_gic_map = pd.read_csv(listings_path)[['RIC', 'GicsCode', 'Sector', 'Industry Group', 'Industry', 'Sub-Industry']]
events = events.merge(ric_gic_map, how='left', on='RIC')
# convert dates columns to datetime format
events['ReportingDate'] = pd.to_datetime(events['ReportingDate'])
events['EventTime'] = pd.to_datetime(events['EventTime'])
if save_path is not None:
events.to_csv(save_path, index=False)
return events
@staticmethod
def merge_events(old_events, new_events):
events = pd.concat([old_events, new_events], ignore_index=True)
events = events.drop_duplicates(subset='EventId')
return events