Skip to content

Instantly share code, notes, and snippets.

@rajibchy
Created August 24, 2022 05:01
Show Gist options
  • Select an option

  • Save rajibchy/d29222aea8fa3424a0f6829c14956a5d to your computer and use it in GitHub Desktop.

Select an option

Save rajibchy/d29222aea8fa3424a0f6829c14956a5d to your computer and use it in GitHub Desktop.
🚀 Memory and desk file cache mechanism with C++ 💯 🌹
// Copyright (c) 2022 Safe Online World Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
// 6:39 PM 11/21/2021
// by Rajib Chy
#include "cfile-stream.h"
cfile_stream::cfile_stream( std::string& path, size_t max_alloc_size ) {
path.swap( _path ); _max_alloc_size = max_alloc_size;
_locker = std::make_shared<std::mutex>( );
}
cfile_stream::cfile_stream( const std::string& path, size_t max_alloc_size ) {
_path = std::string( path.c_str( ) ); _max_alloc_size = max_alloc_size;
_locker = std::make_shared<std::mutex>( );
}
_Check_return_ int cfile_stream::open_nl( ) {
if ( _is_disposed == TRUE )return FALSE;
if ( _is_open == TRUE ) {
if ( _fstream != NULL ) {
if ( _fstream->good( ) ) return TRUE;
_fstream.reset( );
}
}
_fstream = std::make_shared<std::ofstream>( );
//open or create data file as binary;
_fstream->open( _path, std::ios::ate | std::ios::binary );
_is_open = _fstream->good( ) == true ? TRUE : FALSE;
return _is_open;
}
_Check_return_ int cfile_stream::open( ) {
if ( _is_open == TRUE || _is_disposed == TRUE )return FALSE;
int result = FALSE;
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
result = open_nl( );
// exit critical seciton
loc.unlock( );
return result;
}
void cfile_stream::clean( ) {
if ( _is_disposed == TRUE )return;
this->write( "", -1 );
}
void cfile_stream::update_header( int add_qualifier ) {
if ( _fqualifier > 0 ) {
if ( add_qualifier == TRUE ) {
_fstream->seekp( std::ios_base::beg );
_fstream->write( (char*)&_fqualifier, sizeof( _fqualifier ) );
} else {
_fstream->seekp( sizeof( _fqualifier ) );
}
} else {
_fstream->seekp( std::ios_base::beg );
}
_fstream->write( (char*)&_my_line, sizeof( _my_line ) );
}
_Check_return_ int cfile_stream::openx( ) {
if ( _is_disposed == TRUE )return FALSE;
if ( _is_open == FALSE ) {
return this->open( );
}
return _is_open;
}
void cfile_stream::rwrite( const std::vector<std::string>& data ) {
if ( openx( ) == FALSE )return;
write( data );
}
void cfile_stream::writex( const std::string& data, int pos, int rewrite_header ) {
if ( _fstream->good( ) ) {
if ( pos == -1 ) {
// clear cache storage
_fstream->seekp( std::ios_base::beg );
_fstream->write( data.c_str( ), data.size( ) );
} else {
int exists = 0;
size_t spos = 0;
if ( pos != -2 ) {
// get the stream position of given index
spos = _stream_map[pos];
}
if ( !(spos == 0 || spos == std::string::npos) ) {
exists = 1;
} else {
spos = _my_pos;
}
if ( spos == 0 ) {
_my_line = 0;
update_header( TRUE );
if ( _fqualifier > 0 ) spos += sizeof( _fqualifier );
spos += sizeof( spos );
} else {
_fstream->seekp( spos );
}
// sizeof( size_t ) // size_t => 8
size_t element_size = data.size( );
if ( element_size == 0 ) {
_unallocated_pos.push_back( spos );
} else {
if ( element_size > _max_alloc_size ) {
// truncate value if value size > max alloc size
element_size = _max_alloc_size;
}
}
// write header of element size
_fstream->write( (char*)&element_size, sizeof( element_size ) );
std::string payload( data.c_str( ) );
payload.resize( _max_alloc_size );
_fstream->write( payload.c_str( ), _max_alloc_size );
std::string( ).swap( payload );
if ( exists == 0 ) {
if ( pos != -2 ) {
_stream_map[pos] = spos;
} else {
_stream_map[_my_line] = spos;
_my_line++;
}
_my_pos = spos + _max_alloc_size + sizeof( size_t );
if ( rewrite_header == TRUE ) {
update_header( );
}
}
}
}
}
void cfile_stream::write( const std::string& data, int pos ) {
if ( openx( ) == FALSE )return;
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
writex( data, pos );
// exit critical seciton
loc.unlock( );
}
void stream_copy( std::ostream& dst, std::istream& src ) {
dst << src.rdbuf( );
}
void cfile_stream::read( std::vector<std::string>& vec ) {
this->read( [&vec]( const std::string& data, size_t size ) {
vec.push_back( data ); return TRUE;
} );
}
void cfile_stream::read( std::function<_Check_return_ int( const std::string&, size_t )> decoder ) {
if ( !std::filesystem::exists( _path ) )return;
std::string rdpath( _path.c_str( ) );
rdpath.append( "_cpy" );
if ( std::rename( _path.c_str( ), rdpath.c_str( ) ) ) {
throw std::exception( "Error renaming" );
}
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
if ( !_stream_map.empty( ) ) {
_stream_map.clear( );
}
if ( !_unallocated_pos.empty( ) ) {
_unallocated_pos.clear( );
}
int is_moved = FALSE;
std::ifstream file;
//open data file as binary;
file.open( rdpath, std::ifstream::binary | std::ios::in );
if ( file.good( ) ) {
int exit = 0;
size_t flength = 0;
file.seekg( 0, std::ios::end );
flength = file.tellg( );
file.seekg( 0, std::ios::beg );
size_t stream_pos = 0;
if ( flength == 0 || flength == std::string::npos ) {
// empty file
exit = 1;
} else {
if ( _fqualifier > 0 ) {
size_t fq = 0;
// write header of file qualifier
file.read( (char*)&fq, sizeof( fq ) );
if ( fq != _fqualifier ) {
exit = 1;
} else {
stream_pos = sizeof( _fqualifier );
}
}
}
if ( exit == 0 ) {
size_t size = 0;
// read header of total size of vector
file.read( (char*)&size, sizeof( size ) );
stream_pos += sizeof( size );
//vec.resize( size );
for ( size_t i = 0; i < size; ++i ) {
size_t element_size = 0;
// read header of element size
file.read( (char*)&element_size, sizeof( element_size ) );
std::string payload;
// resize to _alloc_size
payload.resize( _max_alloc_size );
// read element to _alloc_size
file.read( (char*)&payload[0], _max_alloc_size );
if ( element_size < _max_alloc_size ) {
// resize to element
payload.resize( element_size );
}
if ( element_size == 0 ) {
// unallocated space found
// keep it for re-allocate
_unallocated_pos.push_back( i );
} else {
if ( decoder( payload, stream_pos ) == FALSE ) {
// Invalid data defined by consumer
// keep it for re-allocate
_unallocated_pos.push_back( i );
}
}
std::string( ).swap( payload );
_stream_map[i] = stream_pos;
stream_pos += (_max_alloc_size + 8);
}
_my_pos = stream_pos;
_my_line = size;
if ( open_nl( ) == FALSE ) {
file.close( ); is_moved = TRUE;
if ( std::rename( rdpath.c_str( ), _path.c_str( ) ) ) {
throw std::exception( "Error renaming" );
}
} else {
file.seekg( 0, std::ios::beg );
stream_copy( *_fstream, file );
if ( _fstream->good( ) ) {
_fstream->flush( );
} else {
std::cout << "Data loss detected in " << _path << std::endl;
_is_open = FALSE;
_fstream->close( );
file.close( ); is_moved = TRUE;
if ( std::rename( rdpath.c_str( ), _path.c_str( ) ) ) {
throw std::exception( "Error renaming" );
}
}
}
}
if ( is_moved == FALSE ) {
file.close( );
}
}
if ( is_moved == FALSE ) {
std::remove( rdpath.c_str( ) );
}
std::string( ).swap( rdpath );
// exit critical seciton
loc.unlock( );
}
void cfile_stream::enqueue( const std::string& data, int pos ) {
if ( _is_disposed == TRUE )return;
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
if ( pos < 0 ) {
_cache_map.push_back( data );
} else {
_update_map[pos] = data;
}
// exit critical seciton
loc.unlock( );
}
_Check_return_ int cfile_stream::try_flush( ) {
int result = FALSE;
size_t cur_len = _my_line;
if ( !_update_map.empty( ) ) {
if ( open_nl( ) == FALSE )return result;
auto it = _update_map.begin( );
auto end = _update_map.end( );
for ( ; it != end; it++ ) {
writex( it->second, (int)it->first, FALSE );
}
_update_map.clear( );
result++;
}
if ( !_cache_map.empty( ) ) {
if ( open_nl( ) == FALSE )return result;
auto it = _cache_map.begin( ); // std::vector<std::string>::iterator
auto end = _cache_map.end( ); // std::vector<std::string>::iterator
for ( ; it != end; it++ ) {
writex( *it, -2, FALSE );
}
_cache_map.clear( );
result++;
}
if ( cur_len != _my_line ) {
// stream line changed
update_header( );
result++;
}
return result;
}
void cfile_stream::write( const std::vector<std::string>& data, int force ) {
if ( _is_disposed == TRUE )return;
if ( _is_open == FALSE ) {
if ( force == FALSE )return;
if ( this->open( ) == FALSE ) return;
}
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
if ( _fstream->good( ) ) {
if ( !_stream_map.empty( ) ) {
_stream_map.clear( );
}
if ( !_unallocated_pos.empty() ) {
_unallocated_pos.clear( );
}
size_t size = data.size( );
_fstream->seekp( std::ios_base::beg );
size_t stream_pos = sizeof( size );
if ( _fqualifier > 0 ) {
// write header of file qualifier
_fstream->write( (char*)&_fqualifier, sizeof( _fqualifier ) );
stream_pos += sizeof( _fqualifier );
}
// write header of total size of vector
_fstream->write( (char*)&size, sizeof( size ) );
size_t index = 0;
for ( size_t i = 0; i < size; ++i ) {
size_t element_size = data[i].size( );
if ( element_size == 0 )continue;
if ( element_size > _max_alloc_size ) {
// truncate value if value size > max alloc size
element_size = _max_alloc_size;
}
// write header of element size
_fstream->write( (char*)&element_size, sizeof( element_size ) );
// write element
std::string payload( data[i] );
payload.resize( _max_alloc_size );
_fstream->write( payload.c_str(), _max_alloc_size );
std::string( ).swap( payload );
_stream_map[index] = stream_pos;
stream_pos += (_max_alloc_size + sizeof( element_size ));
index++;
}
_my_pos = stream_pos;
_my_line = index;
}
// exit critical seciton
loc.unlock( );
}
_Check_return_ int cfile_stream::is_disposed( ) const {
return _is_disposed;
}
_Check_return_ size_t cfile_stream::get_unallocated_pos( ) {
size_t pos = std::string::npos;
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
if ( !_unallocated_pos.empty( ) ) {
pos = _unallocated_pos.back( );
_unallocated_pos.pop_back( );
}
// exit critical seciton
loc.unlock( );
return pos;
}
void cfile_stream::set_qualifier( size_t val ) {
_fqualifier = val;
}
void cfile_stream::flush( ) {
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
if ( try_flush( ) > 0 ) {
if ( _fstream->good( ) ) {
_fstream->seekp( std::ios_base::end );
_fstream->flush( );
}
}
// exit critical seciton
loc.unlock( );
}
void cfile_stream::close( ) {
if ( _is_disposed == TRUE )return;
// enter critical section
std::unique_lock<std::mutex> loc( *_locker );
_is_disposed = TRUE;
if ( _is_open == TRUE ) {
_is_open = FALSE;
_fstream->flush( );
_fstream->close( );
_fstream.reset( );
}
if ( !_path.empty( ) ) {
std::string( ).swap( _path );
}
if ( !_stream_map.empty( ) ) {
_stream_map.clear( );
}
if ( !_cache_map.empty( ) ) {
_cache_map.clear( );
}
if ( !_update_map.empty( ) ) {
_update_map.clear( );
}
if ( !_unallocated_pos.empty( ) ) {
_unallocated_pos.clear( );
}
// exit critical seciton
loc.unlock( );
}
cfile_stream::~cfile_stream( ) {
this->close( );
}
// Copyright (c) 2022 Safe Online World Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.
// 6:12 PM 12/6/2021
// by Rajib Chy
#if defined(_MSC_VER)
#pragma once
#endif //!_MSC_VER
#if !defined(_sow_cfile_stream_h)
#define _sow_cfile_stream_h
#include <string>
#include <mutex>
#include <iostream>
#include <fstream>
#include <sstream>
#include <vector>
#include <filesystem>
#include <functional>
#include <unordered_map>
#include <sow/framework/type-def-base.h>
#ifndef FALSE
#define FALSE 0
#endif//!TRUE
#ifndef TRUE
#define TRUE 1
#endif//!TRUE
#if !defined(_MSC_VER)
#define _Check_return_
// See note on use of "deprecate" at the top of this file
#define _CRT_DEPRECATE_TEXT(_Text) __declspec(deprecated(_Text))
#endif //!_MSC_VER
#define cflush_n_exit( fs )\
fs->flush( ); fs->close( );
class cfile_stream {
public:
cfile_stream(
std::string& path, size_t max_alloc_size
);
cfile_stream(
const std::string& path, size_t max_alloc_size
);
~cfile_stream( );
public:
void clean( );
void close( );
// Flush bin file stream to IO. It will be run on sync mode
void flush( );
// Open bin file. It will be run on sync mode
_Check_return_ int open( );
_Check_return_ int is_disposed( ) const;
// Find unallocated stream position. It will be run on sync mode
_Check_return_ size_t get_unallocated_pos( );
void set_qualifier( size_t val );
// Read bin file if exists. For reuse (update purpose), it will preserve stream's position
// and determined unallocated stream's position.
void read( std::function<_Check_return_ int( const std::string&, size_t )> decoder );
void enqueue(const std::string& data, int pos = -1 );
void write( const std::string& data, int pos = -2 );
// it will be re-write the stream position
void write( const std::vector<std::string>& data, int force = TRUE );
private:
_Check_return_ int openx( );
_Check_return_ int open_nl( );
_Check_return_ int try_flush( );
void writex( const std::string& data, int pos, int rewrite_header = TRUE );
void update_header( int add_qualifier = FALSE );
public:
void _CRT_DEPRECATE_TEXT( "cfile_stream::rwrite not supported" ) rwrite( const std::vector<std::string>& data );
void _CRT_DEPRECATE_TEXT( "Read file with (cfile_stream::read) callback method" ) read( std::vector<std::string>& v );
private:
std::string _path;
size_t _my_pos = 0;
size_t _my_line = 0;
size_t _fqualifier = 0;
size_t _max_alloc_size = 0;
fix_atomic<int> _is_open = FALSE;
fix_atomic<int> _is_disposed = FALSE;
std::shared_ptr<std::mutex> _locker;
std::vector<std::string> _cache_map;
std::vector<size_t> _unallocated_pos;
std::unordered_map<size_t, std::string> _update_map;
std::unordered_map<size_t, size_t> _stream_map;
mutable std::shared_ptr<std::ofstream> _fstream;
};
#endif //!_sow_cfile_stream_h
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment