|
from __future__ import annotations |
|
|
|
import json |
|
import os |
|
from typing import * |
|
|
|
import numpy as np |
|
import openai |
|
import pandas as pd |
|
from openai.types.chat import ChatCompletion |
|
|
|
from googleapiutils2 import Drive, GoogleMimeTypes, Sheets, SheetSlice, get_oauth2_creds |
|
|
|
# Conditional import for type checking |
|
if TYPE_CHECKING: |
|
from googleapiclient._apis.drive.v3.resources import File |
|
from googleapiclient._apis.sheets.v4.resources import Spreadsheet |
|
|
|
|
|
# Function to clean and normalize responses from the OpenAI API. |
|
# Occasionally, the API returns responses with extra quotes, newlines, or code blocks. |
|
def strip_response(response: str) -> str: |
|
quote_chars = ['"', "'", "“", "”"] |
|
|
|
for char in quote_chars: |
|
response = response.strip().strip("\n") |
|
response = response.strip().strip(char) |
|
|
|
quote_chars = ["```", "json", "`"] |
|
|
|
for char in quote_chars: |
|
response = response.strip().strip("\n") |
|
response = response.strip().strip(char) |
|
|
|
return response |
|
|
|
|
|
# Wrapper function to handle the response from the OpenAI API; |
|
# it returns the response as a JSON object if it can be parsed, otherwise it returns None. |
|
def handle_response(response: ChatCompletion) -> dict[str, str] | None: |
|
if not len(response.choices): |
|
return None |
|
|
|
content = response.choices[0].message.content |
|
|
|
if content is None: |
|
return None |
|
|
|
content = strip_response(content) |
|
|
|
try: |
|
if len(data := json.loads(content)): |
|
return data |
|
except Exception as e: |
|
pass |
|
|
|
return None |
|
|
|
|
|
def find_school_name_match(school_name: str, school_names: list[str]): |
|
school_names_str = "\n".join(school_names) |
|
|
|
# the system message is a prompt for the GPT model to follow. |
|
# take not of the structure hereof: |
|
system_msg = f"""Take the following list of input school names and input school name and fuzzy-find it within the input list. |
|
|
|
Return the result as a JSON object with the following keys: |
|
- best_match: the best match, a list of the best match or matches from the **input list verbatim**. If no match is found, return an empty list. |
|
""" |
|
|
|
# the user message is then the input |
|
content = f"""Input school name: {school_name} |
|
Input school names: |
|
{school_names_str} |
|
""" |
|
|
|
response = openai.chat.completions.create( |
|
# gpt-3.5-turbo-1106 is the variant of 3.5 that can output JSON objects |
|
# gpt-4-turbo is the variant of 4 that you'd want to use other than 3.5 |
|
model="gpt-3.5-turbo-1106", |
|
messages=[ |
|
{"role": "system", "content": system_msg}, |
|
{ |
|
"role": "user", |
|
"content": content, |
|
}, |
|
], |
|
# some models are compatible with this, though most aren't |
|
# not necessary, but it makes structured output *much* more reliable |
|
response_format={"type": "json_object"}, |
|
) |
|
|
|
return handle_response(response=response) |
|
|
|
|
|
def reconcile_school_name( |
|
school_name: str, |
|
school_names_to_match_against: list[str], |
|
): |
|
match = find_school_name_match( |
|
school_name=school_name, school_names=school_names_to_match_against |
|
) |
|
|
|
if match is None: |
|
return None |
|
|
|
best_match = match["best_match"] |
|
|
|
if not len(best_match) or not isinstance(best_match, list): |
|
return None |
|
|
|
best_match = best_match[0] |
|
|
|
if best_match not in school_names_to_match_against: |
|
return None |
|
|
|
best_match_index = school_names_to_match_against.index(best_match) |
|
|
|
return (best_match_index, best_match) |
|
|
|
|
|
openai.api_key = os.environ["OPENAI_API_KEY"] |
|
|
|
|
|
creds = get_oauth2_creds() |
|
|
|
drive = Drive(creds=creds) |
|
sheets = Sheets(creds=creds) |
|
|
|
|
|
file_id = "https://docs.google.com/spreadsheets/d/13-MyhvOiOcmCkdtviCY1kUGRNMBUhc2DEEJs545upP0/edit#gid=0" |
|
|
|
|
|
ben_mapping_range = "Data Schools-BEN-Mapping" |
|
ben_mapping_df = sheets.to_frame( |
|
sheets.values( |
|
spreadsheet_id=file_id, |
|
range_name=ben_mapping_range, |
|
) |
|
) |
|
|
|
|
|
completed_bens = ben_mapping_df[ |
|
~(ben_mapping_df["USAC Entity #"].isna() | (ben_mapping_df["USAC Entity #"] == "")) |
|
& ~(ben_mapping_df["SVF Entity #"].isna() | (ben_mapping_df["SVF Entity #"] == "")) |
|
] |
|
|
|
|
|
switch_df = sheets.to_frame( |
|
sheets.values( |
|
spreadsheet_id=file_id, |
|
range_name="Switch", |
|
) |
|
) |
|
switch_df = switch_df[~switch_df["School"].isna()] |
|
|
|
t_switch_school_names = switch_df["School"].unique() |
|
# remove completed schools: |
|
switch_school_names = t_switch_school_names[ |
|
~np.isin(t_switch_school_names, completed_bens["School"].tolist()) |
|
].tolist() |
|
|
|
|
|
usac_entity_df = sheets.to_frame( |
|
sheets.values( |
|
spreadsheet_id=file_id, |
|
range_name="USAC-Entity", |
|
) |
|
) |
|
# dedeup based on the "Entity Number" column: |
|
usac_entity_df = usac_entity_df.drop_duplicates(subset=["Entity Number"]) |
|
|
|
usac_entity_df = usac_entity_df[ |
|
~usac_entity_df["Entity Number"].isin(completed_bens["USAC Entity #"].tolist()) |
|
] |
|
|
|
usac_entity_names = usac_entity_df["Entity Name"].tolist() |
|
|
|
svf_entity_df = sheets.to_frame( |
|
sheets.values( |
|
spreadsheet_id=file_id, |
|
range_name="2024 SVF-Entity", |
|
) |
|
) |
|
# dedup based on the "School Entity Number" column: |
|
svf_entity_df = svf_entity_df.drop_duplicates(subset=["School Entity Number"]) |
|
|
|
svf_entity_df = svf_entity_df[ |
|
~svf_entity_df["School Entity Number"].isin(completed_bens["SVF Entity #"].tolist()) |
|
] |
|
|
|
svf_entity_names = svf_entity_df["School Name"].tolist() |
|
|
|
|
|
# loop through the switch school names and fuzzy-find them |
|
# in the USAC Entity names and SVF Entity names |
|
for school_name in switch_school_names: |
|
print(f"Processing: {school_name}") |
|
|
|
ben_mapping_ix = ben_mapping_df["School"] == school_name |
|
|
|
original_entity_number = ben_mapping_df.loc[ben_mapping_ix][ |
|
"E-Rate Sheets - School Entity #" |
|
].iloc[0] |
|
original_entity_number = ( |
|
str(original_entity_number) if not pd.isna(original_entity_number) else None |
|
) |
|
|
|
row_ix = ben_mapping_df[ben_mapping_df["School"] == school_name].index[0] |
|
row_ix = int(row_ix) + 2 |
|
|
|
row_slice = SheetSlice[ben_mapping_range, row_ix, ...] |
|
|
|
usac_entity_number = None |
|
svf_entity_number = None |
|
|
|
if ( |
|
usac_match := reconcile_school_name( |
|
school_name=school_name, |
|
school_names_to_match_against=usac_entity_names, |
|
) |
|
) is not None: |
|
usac_match_index, usac_match_name = usac_match |
|
usac_entity_number = str(usac_entity_df.iloc[usac_match_index]["Entity Number"]) |
|
print( |
|
f"USAC match: {school_name} -> {usac_match_name} (index: {usac_match_index})" |
|
) |
|
|
|
if ( |
|
svf_match := reconcile_school_name( |
|
school_name=school_name, |
|
school_names_to_match_against=svf_entity_names, |
|
) |
|
) is not None: |
|
svf_match_index, svf_match_name = svf_match |
|
svf_entity_number = str( |
|
svf_entity_df.iloc[svf_match_index]["School Entity Number"] |
|
) |
|
print( |
|
f"SVF match: {school_name} -> {svf_match_name} (index: {svf_match_index})" |
|
) |
|
|
|
any_match = ( |
|
original_entity_number == usac_entity_number |
|
or original_entity_number == svf_entity_number |
|
) |
|
|
|
row = { |
|
"School": school_name, |
|
"USAC Entity #": usac_entity_number, |
|
"SVF Entity #": svf_entity_number, |
|
"Any Original Match?": any_match, |
|
} |
|
|
|
sheets.update( |
|
spreadsheet_id=file_id, |
|
range_name=row_slice, |
|
values=[row], |
|
) |