Created
June 18, 2025 22:02
-
-
Save badjano/7dba7f77ee306e548affe6bb6a8ddee1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import csv | |
| import operator | |
| from collections import defaultdict,Counter | |
| class DataFrame: | |
| def __init__(self,data,columns=None): | |
| if columns is not None: | |
| self.columns=list(columns) | |
| self.data=[dict(zip(columns,row))for row in data] | |
| elif data and isinstance(data[0],dict): | |
| self.columns=list(data[0].keys()) | |
| self.data=[dict(row)for row in data] | |
| else: | |
| raise ValueError("Provide columns with data or data as dicts") | |
| @classmethod | |
| def from_csv(cls,fp,delimiter=",",header=True): | |
| with open(fp,newline='') as f: | |
| rdr=csv.reader(f,delimiter=delimiter) | |
| if header: | |
| cols=next(rdr) | |
| data=[dict(zip(cols,row))for row in rdr] | |
| return cls(data) | |
| else: | |
| data=[row for row in rdr] | |
| return cls(data) | |
| def to_csv(self,fp,delimiter=","): | |
| with open(fp,"w",newline='')as f: | |
| w=csv.DictWriter(f,fieldnames=self.columns,delimiter=delimiter) | |
| w.writeheader() | |
| for row in self.data: | |
| w.writerow(row) | |
| def filter(self,fn): | |
| return DataFrame([r for r in self.data if fn(r)],self.columns) | |
| def select(self,*cols): | |
| return DataFrame([[r[c]for c in cols]for r in self.data],cols) | |
| def sort(self,*cols,reverse=False): | |
| return DataFrame(sorted(self.data,key=lambda r:tuple(r[c]for c in cols),reverse=reverse),self.columns) | |
| def head(self,n=5): | |
| return DataFrame(self.data[:n],self.columns) | |
| def tail(self,n=5): | |
| return DataFrame(self.data[-n:],self.columns) | |
| def groupby(self,*cols): | |
| groups=defaultdict(list) | |
| for row in self.data: | |
| key=tuple(row[c]for c in cols) | |
| groups[key].append(row) | |
| return {k:DataFrame(v,self.columns) for k,v in groups.items()} | |
| def agg(self,group_cols,agg_col,func): | |
| grouped=self.groupby(*group_cols) | |
| result=[] | |
| for k,df in grouped.items(): | |
| vals=[float(r[agg_col])for r in df.data] | |
| result.append(dict(zip(group_cols,k)|{agg_col:func(vals)})) | |
| cols=list(group_cols)+(agg_col,) | |
| return DataFrame(result,cols) | |
| def count(self,by=None): | |
| if by: | |
| c=Counter(tuple(r[k]for k in by)for r in self.data) | |
| return [{**dict(zip(by,k)), "count":v} for k,v in c.items()] | |
| return len(self.data) | |
| def unique(self,col): | |
| return set(r[col]for r in self.data) | |
| def apply(self,col,fn): | |
| for r in self.data: | |
| r[col]=fn(r[col]) | |
| return self | |
| def map(self,fn): | |
| return DataFrame([fn(r)for r in self.data],self.columns) | |
| def to_list(self): | |
| return [list(r.values())for r in self.data] | |
| def to_dicts(self): | |
| return [dict(r)for r in self.data] | |
| def __getitem__(self,key): | |
| if isinstance(key,int): | |
| return self.data[key] | |
| elif isinstance(key,str): | |
| return [r[key]for r in self.data] | |
| elif isinstance(key,slice): | |
| return DataFrame(self.data[key],self.columns) | |
| else: | |
| raise KeyError("Invalid key") | |
| def __len__(self): | |
| return len(self.data) | |
| def __repr__(self): | |
| head=self.data[:5] | |
| lines=["\t".join(self.columns)] | |
| for r in head: | |
| lines.append("\t".join(str(r[c])for c in self.columns)) | |
| if len(self.data)>5: | |
| lines.append("...") | |
| return "\n".join(lines) | |
| def describe(self): | |
| desc={} | |
| num_cols=[c for c in self.columns if all(self._isnum(r[c])for r in self.data)] | |
| for c in num_cols: | |
| vals=[float(r[c])for r in self.data] | |
| desc[c]={ | |
| "count":len(vals), | |
| "mean":sum(vals)/len(vals) if vals else 0, | |
| "min":min(vals) if vals else None, | |
| "max":max(vals) if vals else None, | |
| } | |
| return desc | |
| def _isnum(self,x): | |
| try: | |
| float(x) | |
| return True | |
| except: | |
| return False | |
| def join(self,other,on,how="inner"): | |
| key=lambda r:tuple(r[k]for k in on) | |
| left=sorted(self.data,key=key) | |
| right=sorted(other.data,key=key) | |
| i=j=0 | |
| out=[] | |
| while i<len(left) and j<len(right): | |
| lk=key(left[i]) | |
| rk=key(right[j]) | |
| if lk==rk: | |
| row={**left[i],**right[j]} | |
| out.append(row) | |
| ii=i+1 | |
| while ii<len(left) and key(left[ii])==lk: | |
| out.append({**left[ii],**right[j]}) | |
| ii+=1 | |
| jj=j+1 | |
| while jj<len(right) and key(right[jj])==rk: | |
| out.append({**left[i],**right[jj]}) | |
| jj+=1 | |
| i=ii | |
| j=jj | |
| elif lk<rk: | |
| if how in("left","outer"): | |
| out.append(left[i]) | |
| i+=1 | |
| else: | |
| if how in("right","outer"): | |
| out.append(right[j]) | |
| j+=1 | |
| if how in("left","outer"): | |
| for k in range(i,len(left)): | |
| out.append(left[k]) | |
| if how in("right","outer"): | |
| for k in range(j,len(right)): | |
| out.append(right[k]) | |
| cols=list({k for r in out for k in r}) | |
| return DataFrame(out,cols) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment