Skip to content

Instantly share code, notes, and snippets.

@badjano
Created June 18, 2025 22:02
Show Gist options
  • Select an option

  • Save badjano/7dba7f77ee306e548affe6bb6a8ddee1 to your computer and use it in GitHub Desktop.

Select an option

Save badjano/7dba7f77ee306e548affe6bb6a8ddee1 to your computer and use it in GitHub Desktop.
import csv
import operator
from collections import defaultdict,Counter
class DataFrame:
def __init__(self,data,columns=None):
if columns is not None:
self.columns=list(columns)
self.data=[dict(zip(columns,row))for row in data]
elif data and isinstance(data[0],dict):
self.columns=list(data[0].keys())
self.data=[dict(row)for row in data]
else:
raise ValueError("Provide columns with data or data as dicts")
@classmethod
def from_csv(cls,fp,delimiter=",",header=True):
with open(fp,newline='') as f:
rdr=csv.reader(f,delimiter=delimiter)
if header:
cols=next(rdr)
data=[dict(zip(cols,row))for row in rdr]
return cls(data)
else:
data=[row for row in rdr]
return cls(data)
def to_csv(self,fp,delimiter=","):
with open(fp,"w",newline='')as f:
w=csv.DictWriter(f,fieldnames=self.columns,delimiter=delimiter)
w.writeheader()
for row in self.data:
w.writerow(row)
def filter(self,fn):
return DataFrame([r for r in self.data if fn(r)],self.columns)
def select(self,*cols):
return DataFrame([[r[c]for c in cols]for r in self.data],cols)
def sort(self,*cols,reverse=False):
return DataFrame(sorted(self.data,key=lambda r:tuple(r[c]for c in cols),reverse=reverse),self.columns)
def head(self,n=5):
return DataFrame(self.data[:n],self.columns)
def tail(self,n=5):
return DataFrame(self.data[-n:],self.columns)
def groupby(self,*cols):
groups=defaultdict(list)
for row in self.data:
key=tuple(row[c]for c in cols)
groups[key].append(row)
return {k:DataFrame(v,self.columns) for k,v in groups.items()}
def agg(self,group_cols,agg_col,func):
grouped=self.groupby(*group_cols)
result=[]
for k,df in grouped.items():
vals=[float(r[agg_col])for r in df.data]
result.append(dict(zip(group_cols,k)|{agg_col:func(vals)}))
cols=list(group_cols)+(agg_col,)
return DataFrame(result,cols)
def count(self,by=None):
if by:
c=Counter(tuple(r[k]for k in by)for r in self.data)
return [{**dict(zip(by,k)), "count":v} for k,v in c.items()]
return len(self.data)
def unique(self,col):
return set(r[col]for r in self.data)
def apply(self,col,fn):
for r in self.data:
r[col]=fn(r[col])
return self
def map(self,fn):
return DataFrame([fn(r)for r in self.data],self.columns)
def to_list(self):
return [list(r.values())for r in self.data]
def to_dicts(self):
return [dict(r)for r in self.data]
def __getitem__(self,key):
if isinstance(key,int):
return self.data[key]
elif isinstance(key,str):
return [r[key]for r in self.data]
elif isinstance(key,slice):
return DataFrame(self.data[key],self.columns)
else:
raise KeyError("Invalid key")
def __len__(self):
return len(self.data)
def __repr__(self):
head=self.data[:5]
lines=["\t".join(self.columns)]
for r in head:
lines.append("\t".join(str(r[c])for c in self.columns))
if len(self.data)>5:
lines.append("...")
return "\n".join(lines)
def describe(self):
desc={}
num_cols=[c for c in self.columns if all(self._isnum(r[c])for r in self.data)]
for c in num_cols:
vals=[float(r[c])for r in self.data]
desc[c]={
"count":len(vals),
"mean":sum(vals)/len(vals) if vals else 0,
"min":min(vals) if vals else None,
"max":max(vals) if vals else None,
}
return desc
def _isnum(self,x):
try:
float(x)
return True
except:
return False
def join(self,other,on,how="inner"):
key=lambda r:tuple(r[k]for k in on)
left=sorted(self.data,key=key)
right=sorted(other.data,key=key)
i=j=0
out=[]
while i<len(left) and j<len(right):
lk=key(left[i])
rk=key(right[j])
if lk==rk:
row={**left[i],**right[j]}
out.append(row)
ii=i+1
while ii<len(left) and key(left[ii])==lk:
out.append({**left[ii],**right[j]})
ii+=1
jj=j+1
while jj<len(right) and key(right[jj])==rk:
out.append({**left[i],**right[jj]})
jj+=1
i=ii
j=jj
elif lk<rk:
if how in("left","outer"):
out.append(left[i])
i+=1
else:
if how in("right","outer"):
out.append(right[j])
j+=1
if how in("left","outer"):
for k in range(i,len(left)):
out.append(left[k])
if how in("right","outer"):
for k in range(j,len(right)):
out.append(right[k])
cols=list({k for r in out for k in r})
return DataFrame(out,cols)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment