##Requirements
- git
- json (1.8.1)
- mysql (2.9.1)
- sqlite3 (1.3.10)
Deb based OS:
- apt-get install libmysqlclient-dev libsqlite3-dev
OSX:
-
brew install mysql
-
brew install sqlite3
-
gem install mysql -v '2.9.1'
-
gem install sqlite3 -v '1.3.10'
| #!/usr/bin/env ruby | |
| # coding: UTF-8 | |
| # elb-logs.rb | |
| # | |
| # fetch and analyze ELB access logs | |
| # (http://docs.aws.amazon.com/ElasticLoadBalancing/latest/DeveloperGuide/access-log-collection.html) | |
| # | |
| # created on : 2014.03.07 | |
| # last update: 2014.03.10 | |
| # | |
| # by meinside@gmail.com | |
| require 'bundler/setup' | |
| require 'my_aws' | |
| require 'my_sqlite' | |
| require 'thor' | |
| require 'geocoder' | |
| DB_FILENAME = 'elb_access_logs.sqlite' | |
| S3_CONFIGS = { | |
| access_key_id: '__aws_access_key_id__', | |
| secret_access_key: '__aws_secret_access_key__', | |
| bucket: '__s3_bucket_name__', | |
| } | |
| module AWS | |
| class ELB::AccessLog | |
| attr_reader :timestamp, :elb, :client_ip, :client_port, :backend_ip, :backend_port, :process_time, :received_bytes, :sent_bytes, :request_method, :request_url, :request_protocol, :response_status | |
| def initialize(line) | |
| components = line.split(' ') | |
| # timestamp | |
| @timestamp = Time.parse(components[0]) | |
| # elb name | |
| @elb = components[1] | |
| # client | |
| @client_ip, @client_port = components[2].split(':') | |
| # backend (EC2 instance) | |
| @backend_ip, @backend_port = components[3].split(':') | |
| # time taken | |
| @process_time = components[5].to_f | |
| # recv/send | |
| @received_bytes = components[9].to_i | |
| @sent_bytes = components[10].to_i | |
| # request (HTTP only) | |
| request = line.scan(/"(.*?)"$/)[0][0] | |
| @request_method, @request_url, @request_protocol = request.split(/\s/).map(&:strip) | |
| @request_method = nil if @request_method == '-' | |
| @request_url = nil if @request_url == '-' | |
| @request_protocol = nil if @request_protocol == '-' | |
| # response (HTTP only) | |
| @response_status = components[8] | |
| @response_status = nil if @response_status == '-' | |
| end | |
| def to_s | |
| "[#{@elb}] #{@timestamp.strftime('%Y-%m-%d %H:%M:%S.%N')} | #{@client_ip}:#{@client_port} - #{@backend_ip}:#{@backend_port}, #{@process_time} seconds, recv: #{@received_bytes} / send: #{@sent_bytes} bytes" + (@request_method.nil? ? '' : " (#{@request_method} #{@request_protocol} #{@request_url} => #{@response_status})") | |
| end | |
| class Helper | |
| def self.parse_key(key) | |
| components = File.basename(key.split('/')[-1], '.*').split('_') | |
| { | |
| aws_account: components[0], | |
| region: components[2], | |
| elb: components[3], | |
| datetime: Time.parse(components[4]), | |
| elb_ip: components[5], | |
| key: key, | |
| } | |
| end | |
| end | |
| class Database | |
| attr_accessor :filepath | |
| @@db = nil | |
| private | |
| def initialize(filepath) | |
| @filepath = filepath | |
| @db = MySqlite.open(@filepath) | |
| # table: fetched | |
| @db.execute_query('create table if not exists fetched( | |
| id integer primary key autoincrement, | |
| aws_account text not null, | |
| region text not null, | |
| elb_ip text not null, | |
| log_time text not null | |
| )') | |
| @db.execute_query('create index if not exists idx_fetched on fetched( | |
| aws_account, region, elb_ip, log_time | |
| )') | |
| # table: access_logs | |
| @db.execute_query('create table if not exists access_logs( | |
| id integer primary key autoincrement, | |
| aws_account text not null, | |
| region text not null, | |
| elb_ip text not null, | |
| log_time text not null, | |
| elb text not null, | |
| timestamp text not null, | |
| client_ip text not null, | |
| client_port integer default null, | |
| backend_ip text not null, | |
| backend_port integer default null, | |
| process_time real not null, | |
| received_bytes integer not null, | |
| sent_bytes integer not null, | |
| request_method text default null, | |
| request_url text default null, | |
| request_protocol text default null, | |
| response_status text default null | |
| )') | |
| @db.execute_query('create index if not exists idx_access_logs_1 on access_logs( | |
| aws_account, region, elb_ip | |
| )') | |
| @db.execute_query('create index if not exists idx_access_logs_2 on access_logs( | |
| aws_account, region, elb_ip, log_time | |
| )') | |
| @db.execute_query('create index if not exists idx_access_logs_3 on access_logs( | |
| timestamp | |
| )') | |
| @db.execute_query('create index if not exists idx_access_logs_4 on access_logs( | |
| client_ip | |
| )') | |
| @db.execute_query('create index if not exists idx_access_logs_5 on access_logs( | |
| response_status | |
| )') | |
| # table: geo_locations | |
| @db.execute_query('create table if not exists geo_locations( | |
| ip text primary key, | |
| country_code text default null, | |
| country_name text default null, | |
| city text default null, | |
| latitude real default null, | |
| longitude real default null | |
| )') | |
| @db.execute_query('create index if not exists idx_geo_locations_1 on geo_locations( | |
| country_code | |
| )') | |
| @db.execute_query('create index if not exists idx_geo_locations_2 on geo_locations( | |
| country_name | |
| )') | |
| @db.execute_query('create index if not exists idx_geo_locations_999 on geo_locations( | |
| latitude, | |
| longitude | |
| )') | |
| end | |
| public | |
| def self.instance | |
| @@db = Database.new(File.join(File.dirname(__FILE__), DB_FILENAME)) unless @@db | |
| @@db | |
| end | |
| # check if logs with given aws_account/region/elb_ip/log_time already exist | |
| # | |
| # @param aws_account [String] aws account | |
| # @param region [String] region | |
| # @param elb_ip [String] ELB's ip address | |
| # @param log_time [Date] log time | |
| # | |
| # @return [true,false] | |
| def log_exists?(aws_account, region, elb_ip, log_time) | |
| @db.execute_query('select count(id) from fetched where aws_account = ? and region = ? and elb_ip = ? and log_time = ?', | |
| [aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] | |
| )[0][0] > 0 | |
| end | |
| # save log entry | |
| # | |
| # @param aws_account [String] aws account | |
| # @param region [String] region | |
| # @param elb_ip [String] ELB's ip address | |
| # @param log_time [Date] log time | |
| # @param log_entry [AWS::ELB::LogEntry] log entry | |
| def save_log(aws_account, region, elb_ip, log_time, log_entry) | |
| # insert to the table | |
| @db.execute_query( | |
| 'insert into access_logs( | |
| aws_account, region, elb_ip, log_time, | |
| elb, timestamp, client_ip, client_port, backend_ip, backend_port, process_time, received_bytes, sent_bytes, | |
| request_method, request_url, request_protocol, response_status | |
| ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', | |
| [ | |
| aws_account, | |
| region, | |
| elb_ip, | |
| log_time.strftime('%Y-%m-%d %H:%M:%S'), | |
| log_entry.elb, | |
| log_entry.timestamp.strftime('%Y-%m-%d %H:%M:%S.%N'), | |
| log_entry.client_ip, | |
| log_entry.client_port, | |
| log_entry.backend_ip, | |
| log_entry.backend_port, | |
| log_entry.process_time, | |
| log_entry.received_bytes, | |
| log_entry.sent_bytes, | |
| log_entry.request_method, | |
| log_entry.request_url, | |
| log_entry.request_protocol, | |
| log_entry.response_status | |
| ] | |
| ) | |
| end | |
| # mark given log as fetched | |
| # | |
| # @param aws_account [String] aws account | |
| # @param region [String] region | |
| # @param elb_ip [String] ip address | |
| # @param log_time [Time] log time | |
| def mark_log_fetched(aws_account, region, elb_ip, log_time) | |
| # mark this log as fetched | |
| @db.execute_query( | |
| 'insert into fetched(aws_account, region, elb_ip, log_time) values(?, ?, ?, ?)', | |
| [aws_account, region, elb_ip, log_time.strftime('%Y-%m-%d %H:%M:%S')] | |
| ) | |
| end | |
| # list all ips from saved logs | |
| # | |
| # @param option [Hash] option for listing | |
| # @return [Array<String>] array of ips | |
| def client_ips(option = nil) | |
| @db.execute_query("select #{option[:unique] ? 'distinct' : ''} client_ip from access_logs order by timestamp").map{|x| x[0]} | |
| end | |
| # cache ip and its geo location information for future use | |
| # | |
| # @param ip [String] ip address | |
| # @return [Database::GeoLocation, nil] successfully cached or not | |
| def cache_geo(ip) | |
| unless cached = cached_geo(ip) | |
| info = Geocoder.search(ip).first | |
| @db.execute_query('insert or replace into geo_locations(ip, country_code, country_name, city, latitude, longitude) values(?, ?, ?, ?, ?, ?)', | |
| [info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude] | |
| ) | |
| return GeoLocation.new(info.ip, info.country_code, info.country, info.city, info.latitude, info.longitude) | |
| end | |
| nil | |
| rescue | |
| puts "* exception while caching ip: #{ip} - #{$!}" | |
| nil | |
| end | |
| # get cached geo location information | |
| # | |
| # @param ip [String] ip address | |
| # @return [Database::GeoLocation, nil] nil if not cached | |
| def cached_geo(ip) | |
| geo = @db.execute_query('select * from geo_locations where ip = ?', | |
| [ip] | |
| ) | |
| if geo.count > 0 | |
| GeoLocation.new(geo[0], geo[1], geo[2], geo[3], geo[4], geo[5]) | |
| else | |
| nil | |
| end | |
| end | |
| # get ELB names | |
| # | |
| # @return [Array<String>] ELB names | |
| def elbs | |
| @db.execute_query('select distinct elb from access_logs order by elb').map{|x| x[0]} | |
| end | |
| # get number of ips per country for given ELB name | |
| # | |
| # @param elb [String] ELB name | |
| def num_ips_per_country(elb) | |
| @db.execute_query('select country_name as country, count(ip) as num_ips | |
| from geo_locations | |
| where ip in (select distinct client_ip from access_logs where elb = ?) | |
| group by country_name | |
| order by num_ips desc', | |
| [elb] | |
| ).map{|x| {country: x[0], num_ips: x[1]}} | |
| end | |
| def info | |
| # from ~ to | |
| from = @db.execute_query('select timestamp from access_logs order by timestamp limit 1').first[0] | |
| to = @db.execute_query('select timestamp from access_logs order by timestamp desc limit 1').first[0] | |
| # number of access logs | |
| num_access_logs = {} | |
| elbs.each{|elb| | |
| num_access_logs[elb] = @db.execute_query('select count(id) from access_logs where elb = ?', [elb]).first[0] | |
| } | |
| num_access_logs[:all] = @db.execute_query('select count(id) from access_logs').first[0] | |
| # number of ips | |
| num_ips = {} | |
| elbs.each{|elb| | |
| num_ips[elb] = @db.execute_query('select count(ip) from geo_locations | |
| where ip in (select distinct client_ip from access_logs where elb = ?)', | |
| [elb] | |
| ).first[0] | |
| } | |
| num_ips[:all] = @db.execute_query('select count(distinct ip) from geo_locations').first[0] | |
| # return | |
| { | |
| from: from, | |
| to: to, | |
| num_access_logs: num_access_logs, | |
| num_ips: num_ips, | |
| } | |
| end | |
| class GeoLocation | |
| attr_accessor :ip, :country_code, :country_name, :city, :latitude, :longitude | |
| def initialize(ip, country_code, country_name, city, latitude, longitude) | |
| @ip = ip | |
| @country_code = country_code | |
| @country_name = country_name | |
| @city = city | |
| @latitude = latitude | |
| @longitude = longitude | |
| end | |
| def to_s | |
| "[%s] %s(%s), %s (%.4f, %.4f)" %[@ip, @country_name, @country_code, @city, @latitude, @longitude] | |
| end | |
| end | |
| end | |
| end | |
| end | |
| class Exec < Thor | |
| default_task :fetch_logs | |
| desc "fetch_logs", "fetch all ELB access log files from S3" | |
| method_option :cache_geo, type: :boolean, aliases: '-g', desc: 'also cache geo locations for logged ips' | |
| method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' | |
| def fetch_logs | |
| puts "> fetching logs..." | |
| # configure | |
| if MyAws::S3.config(S3_CONFIGS) | |
| num_logs = 0 | |
| # fetch objects from the bucket | |
| MyAws::S3.bucket(S3_CONFIGS[:bucket]).objects.each{|o| | |
| # get the database instance | |
| db = AWS::ELB::AccessLog::Database.instance | |
| # parse only actual log files | |
| if o.key =~ /elasticloadbalancing.*?\.log$/ | |
| parsed = AWS::ELB::AccessLog::Helper.parse_key(o.key) | |
| aws_account = parsed[:aws_account] | |
| elb = parsed[:elb] | |
| region = parsed[:region] | |
| elb_ip = parsed[:elb_ip] | |
| log_time = parsed[:datetime] | |
| # check if this log is already fetched/saved | |
| if db.log_exists?(aws_account, region, elb_ip, log_time) | |
| puts "* skipping alread-fetched log: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? | |
| next | |
| else | |
| num_logs += 1 | |
| end | |
| # read all log lines | |
| puts "> processing logs for: #{aws_account}/#{region}/#{elb}(#{elb_ip})/#{log_time.strftime('%Y-%m-%d %H:%M:%S')}" if options.verbose? | |
| o.read.lines.map{|x| x.strip}.each{|l| | |
| entry = AWS::ELB::AccessLog.new(l) | |
| db.save_log(aws_account, region, elb_ip, log_time, entry) | |
| } | |
| # mark log as fetched | |
| db.mark_log_fetched(aws_account, region, elb_ip, log_time) | |
| end | |
| } | |
| puts ">>> fetched #{num_logs} new log file(s)" | |
| # also cache geo locations if option is provided | |
| cache_geo if options.cache_geo? | |
| else | |
| puts "* S3 configuration failed" | |
| end | |
| end | |
| desc "cache_geo", "cache geo location info for client ips in saved logs" | |
| method_option :verbose, type: :boolean, aliases: '-v', desc: 'show verbose messages' | |
| def cache_geo | |
| puts "> caching geo locations for logged ips..." | |
| db = AWS::ELB::AccessLog::Database.instance | |
| num_ips = 0 | |
| db.client_ips({unique: true}).each{|ip| | |
| if cached = db.cache_geo(ip) | |
| puts "> cached geo ip: #{cached}" if options.verbose? | |
| num_ips += 1 | |
| end | |
| } | |
| puts ">>> cached #{num_ips} new geo location(s)" | |
| end | |
| desc "count_ips", "list number of unique ips per country" | |
| def count_ips | |
| puts "> counting ips per country..." | |
| db = AWS::ELB::AccessLog::Database.instance | |
| db.elbs.each{|elb| | |
| puts ">>> ELB: #{elb}" | |
| db.num_ips_per_country(elb).each{|counts| | |
| puts "#{counts[:country]}: #{counts[:num_ips]}" | |
| } | |
| puts | |
| } | |
| end | |
| desc "info", "show info on database file" | |
| def info | |
| puts "> printing info of database..." | |
| db = AWS::ELB::AccessLog::Database.instance | |
| puts ">>> database: #{db.filepath}" | |
| info = db.info | |
| puts ">>> #{info[:from]} ~ #{info[:to]}" | |
| puts ">>> number of accesses" | |
| info[:num_access_logs].each{|k, v| | |
| puts " #{k}: #{v}" | |
| } | |
| puts ">>> number of ips" | |
| info[:num_ips].each{|k, v| | |
| puts " #{k}: #{v}" | |
| } | |
| end | |
| end | |
| trap('SIGINT') { puts; exit 1 } | |
| Exec.start(ARGV) |
| source 'http://rubygems.org' | |
| gem 'meinside-ruby', github: 'meinside/meinside-ruby' # my ruby scripts and libraries | |
| gem 'thor' | |
| gem 'geocoder' |