class RefinitivTranscriptAnalyser:
lemmatizer = None
def __init__(self, input_words_path='Keywords/input_cost_words.csv',transcripts_dir_path='Transcripts', listings_path='Listings/asx_listings.csv', events_path='Events/events.csv') -> None:
self.transcripts_dir_path = transcripts_dir_path
self.listings_path = listings_path
self.input_words_path = input_words_path
self.events_path = events_path
def get_keyword_counts_from_transcripts(self, input_words_path=None, add_meta=True, paras=True):
# collect keywords and their qualifiers, (and the set of each)
input_words_path = self.input_words_path if input_words_path is None else input_words_path
keywords_to_qualifiers = get_basic_keys_and_quals(input_words_path)
keywords = keywords_to_qualifiers.keys()
qualifiers = set.union(*[set(qualifiers) for qualifiers in keywords_to_qualifiers.values()])
# for each transcript in the path, add its keyword counts to the df below
transcript_counts = pd.DataFrame(columns=['matches', 'total_words', 'quotes_with_matches', 'quotes'])
transcripts = os.listdir(self.transcripts_dir_path)
events = pd.read_csv(self.events_path)
active_transcripts = {str(t) + '.txt' for t in events['TranscriptId']}
transcripts = [transcript for transcript in transcripts if transcript in active_transcripts]
print(len(transcripts))
for transcript in tqdm(transcripts):
transcript_id = transcript[:-4]
with open(f'{self.transcripts_dir_path}/{transcript}') as file:
transcript_xml = file.read()
if transcript_xml:
if paras:
transcript_quotes = self.get_paras_from_xml(transcript_xml)
else:
transcript_quotes = self.get_sentences_from_xml(transcript_xml)
transcript_counts.loc[transcript_id] = self.count_keywords_basic(transcript_quotes, keywords, qualifiers, keywords_to_qualifiers)
if not add_meta:
return transcript_counts
transcript_counts = self.add_event_meta(transcript_counts)
transcript_counts = transcript_counts.set_index('EventTime', drop=True)
# transcript_counts.index = pd.to_datetime(transcript_counts.index)
return transcript_counts
# tokenizes and lemmatizes words in a phrase if they are 2 or more characters, rejoins by spaces
def lemmatize_phrase(self, phrase):
if self.lemmatizer is None:
self.lemmatizer = WordNetLemmatizer()
return ' '.join([self.lemmatizer.lemmatize(w) for w in word_tokenize(phrase) if len(w) >= 2])
# within a qualifier group, lemmatizers and sorts qualifiers making each group a tuple
# this avoids unordered qualifier groups being considered different when members are the same
def sort_lemmatize_tuple_quals(self, quals):
return tuple(sorted(self.lemmatize_phrase(qual) for qual in quals))
def get_counts_by_subindex_sector(self, input_words_path=None):
input_words_path = self.input_words_path if input_words_path is None else input_words_path
# collect keywords and their qualifiers, (and the set of each) and lemmatize
keys_subs, keys_quals = get_subindices_keys_and_quals(input_words_path)
l_keys = [self.lemmatize_phrase(phrase) for phrase in keys_quals.keys()]
l_quals = [self.sort_lemmatize_tuple_quals(quals) for quals in keys_quals.values()]
l_keys_subs = {key: sub for key, sub in zip(l_keys, keys_subs.values())}
l_keys_quals = {key: qual for key, qual in zip(l_keys, l_quals)}
# for each transcript in the path, add its keyword counts to the df below, only if it exists in our events table
events = pd.read_csv(self.events_path)
transcripts = os.listdir(self.transcripts_dir_path)
active_transcripts = {str(t) + '.txt' for t in events['TranscriptId']}
transcripts = [transcript for transcript in transcripts if transcript in active_transcripts]
# df for storing the counts by each transcript id
transcript_counts = pd.DataFrame()
for transcript in tqdm(transcripts):
transcript_id = transcript[:-4]
with open(f'{self.transcripts_dir_path}/{transcript}') as file:
transcript_xml = file.read()
if len(transcript_xml) == 0:
continue
l_quotes = self.get_sentences_from_xml(transcript_xml).apply(self.lemmatize_phrase)
subindex_matches = self.get_matches_by_subindex(l_quotes, l_keys_subs, l_keys_quals)
subindex_matches.index = [transcript_id] * len(subindex_matches)
# append results of current iter to transcript counts
transcript_counts = pd.concat([transcript_counts, subindex_matches])
keep_cols = ['TranscriptId','EventTime', 'Sector', 'subindex', 'matches', 'total_words']
transcript_counts = self.add_event_meta(transcript_counts, keep_cols=keep_cols)
transcript_counts['EventTime'] = pd.to_datetime(transcript_counts['EventTime'])
return transcript_counts.set_index('EventTime')
# Adds event meta data
def add_event_meta(self, df: pd.DataFrame, keep_cols=None) -> pd.DataFrame:
events = pd.read_csv(self.events_path)
df = df.merge(events, how='left', left_on=df.index, right_on='TranscriptId')
if keep_cols is None:
keep_cols = ['TranscriptId','EventTime', 'Sector', 'matches', 'total_words', 'quotes_with_matches', 'quotes']
return df[keep_cols]
@staticmethod
def count_keywords_basic(quotes: pd.Series, keywords, qualifiers, keywords_to_qualifiers) -> tuple:
total_words = 0
quotes_with_matches = 0
matches = 0
for quote in quotes:
quote = ' '.join([word for word in quote.split(' ') if word != 'our'])
# increment total words by length of each quote
total_words += len(quote.split(' '))
wordcounts = defaultdict(int)
for word in (keywords | qualifiers):
if word in quote:
wordcounts[word] += 1
qual_set_present = {}
# for each keyword sum the amount of occurences of that words qualifiers
# if we have already seen that set of qualifiers before, use the count we found previously
for keyword in keywords:
if wordcounts[keyword]:
keyword_qualifiers = keywords_to_qualifiers[keyword]
if keyword_qualifiers not in qual_set_present:
qual_set_present[keyword_qualifiers] = any(wordcounts[qual] for qual in keyword_qualifiers)
# if keyword is qualified increment matches by amount of keywords
kw_matches = qual_set_present[keyword_qualifiers]
quotes_with_matches += 1 if kw_matches else 0
matches += kw_matches
return (matches, total_words, quotes_with_matches, len(quotes))
def get_matches_by_subindex(self, l_quotes, l_keys_subs, l_keys_quals):
keyword_matches = self.get_matches_by_keyword(l_quotes, l_keys_quals)
keyword_matches['subindex'] = keyword_matches.index.map(l_keys_subs)
subindex_matches = keyword_matches.set_index('subindex', drop=True)
subindex_matches = subindex_matches.groupby('subindex').sum()
subindex_matches['total_words'] = int(keyword_matches['total_words'].mean())
return subindex_matches.reset_index()
@staticmethod
def get_matches_by_keyword(l_quotes: pd.Series, l_keys_quals) -> tuple:
keywords = set(l_keys_quals.keys())
qualifiers = set.union(*[set(quals) for quals in l_keys_quals.values()])
total_words = 0
keyword_matches = defaultdict(int)
for quote in l_quotes:
quote = ' '.join([word for word in quote.split(' ') if word != 'our'])
# increment total words by length of each quote
total_words += quote.count(' ') + 1
wordcounts = defaultdict(int)
quote_word_list = quote.split(' ')
for word in (keywords | qualifiers):
if word.count(' '):
wordcounts[word] += quote.count(word) # for searching bi-grams, n-grams
else:
wordcounts[word] += quote_word_list.count(word) # searching singles
qual_set_present = {}
# for each keyword sum the amount of occurences of that words qualifiers
# if we have already seen that set of qualifiers before, use the count we found previously
for keyword in keywords:
keyword_matches[keyword]
if wordcounts[keyword]:
keyword_qualifiers = l_keys_quals[keyword]
if keyword_qualifiers not in qual_set_present:
qual_set_present[keyword_qualifiers] = any(wordcounts[qual] for qual in keyword_qualifiers)
# if keyword is qualified increment matches by amount of keywords
keyword_matches[keyword] += qual_set_present[keyword_qualifiers]
result = pd.DataFrame.from_dict(keyword_matches, orient='index', columns=['matches'])
result['total_words'] = total_words
return result
# Given an text xml transcript, fetches the quotes and speaker info for all sections of the transcript
@staticmethod
def get_paras_from_xml(xml: str) -> pd.Series:
if not xml:
return pd.Series()
xml = xml.replace('<?xml version="1.0" encoding="ISO-8859-1"?>\n','')
soup = BeautifulSoup(xml, 'xml')
quotes = []
for section in soup.find_all('Section'):
for turn in section.find_all('Turn'):
quote = ''
for i, item in enumerate(turn.contents):
if getattr(item, 'name', None) == 'br':
quotes.append(quote.strip())
quote = ''
elif isinstance(item, NavigableString):
quote += str(item)
if quote:
quotes.append(quote.strip())
return pd.Series(quotes).str.lower()
# Given an text xml transcript, fetches the quotes and speaker info for all sections of the transcript
@staticmethod
def get_sentences_from_xml(xml: str) -> pd.Series:
if not xml:
return pd.Series()
xml = xml.replace('<?xml version="1.0" encoding="ISO-8859-1"?>\n','')
soup = BeautifulSoup(xml, 'xml')
quotes = []
for section in soup.find_all('Section'):
for turn in section.find_all('Turn'):
for string in turn.stripped_strings:
quotes.append(string)
return pd.Series(quotes).str.lower()