Skip to content

Instantly share code, notes, and snippets.

@mikigom
Last active April 25, 2017 07:55
Show Gist options
  • Select an option

  • Save mikigom/7c22cf92a602bd68901ac9ecd596bc4e to your computer and use it in GitHub Desktop.

Select an option

Save mikigom/7c22cf92a602bd68901ac9ecd596bc4e to your computer and use it in GitHub Desktop.
import subprocess
import unittest
import sys
import filecmp
bash_call = './' + sys.argv[1]
def subprocess_pipe(cmd_list):
prev_stdin = None
last_p = None
for str_cmd in cmd_list:
cmd = str_cmd.split()
last_p = subprocess.Popen(cmd, stdin=prev_stdin, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
prev_stdin = last_p.stdout
(stdoutdata, stderrdata) = last_p.communicate()
return stdoutdata, stderrdata
class BashTest(unittest.TestCase):
def test1(self):
test_cmd = 'ls'
cmd_list = [bash_call, test_cmd]
homebrew_sh_stdout, homebrew_sh_stderr = subprocess_pipe(cmd_list)
naive_sh = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = naive_sh.communicate()
if out is None:
self.assertEqual(homebrew_sh_stdout, '')
else:
self.assertEqual(homebrew_sh_stdout, out)
if err is None:
self.assertEqual(homebrew_sh_stderr, '')
else:
self.assertEqual(homebrew_sh_stderr, err)
def test2(self):
test_cmd = 'ls > out'
cmd_list = [bash_call, test_cmd + '_homebrew']
homebrew_sh_stdout, homebrew_sh_stderr = subprocess_pipe(cmd_list)
naive_sh = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = naive_sh.communicate()
if out is None:
self.assertEqual(homebrew_sh_stdout, '')
else:
self.assertEqual(homebrew_sh_stdout, out)
if err is None:
self.assertEqual(homebrew_sh_stderr, '')
else:
self.assertEqual(homebrew_sh_stderr, err)
self.assertTrue(filecmp.cmp('out', 'out_homebrew'), msg = None)
def test3(self):
test_cmd = 'cat < code.c'
cmd_list = [bash_call, test_cmd]
homebrew_sh_stdout, homebrew_sh_stderr = subprocess_pipe(cmd_list)
naive_sh = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = naive_sh.communicate()
if out is None:
self.assertEqual(homebrew_sh_stdout, '')
else:
self.assertEqual(homebrew_sh_stdout, out)
if err is None:
self.assertEqual(homebrew_sh_stderr, '')
else:
self.assertEqual(homebrew_sh_stderr, err)
def test4(self):
test_cmd = 'cat < code.c > out'
cmd_list = [bash_call, test_cmd + '_homebrew']
homebrew_sh_stdout, homebrew_sh_stderr = subprocess_pipe(cmd_list)
naive_sh = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = naive_sh.communicate()
if out is None:
self.assertEqual(homebrew_sh_stdout, '')
else:
self.assertEqual(homebrew_sh_stdout, out)
if err is None:
self.assertEqual(homebrew_sh_stderr, '')
else:
self.assertEqual(homebrew_sh_stderr, err)
self.assertTrue(filecmp.cmp('out', 'out_homebrew'), msg = None)
"""
def test5(self):
test_cmd = 'cat >> out'
cmd_list = [bash_call, test_cmd]
homebrew_sh, _ = subprocess_pipe(cmd_list)
naive_sh = subprocess.check_output(test_cmd, shell=True)
self.assertEqual(homebrew_sh, naive_sh)
"""
def test6(self):
test_cmd = 'cat < code.c | more'
cmd_list = [bash_call, test_cmd]
homebrew_sh_stdout, homebrew_sh_stderr = subprocess_pipe(cmd_list)
naive_sh = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = naive_sh.communicate()
if out is None:
self.assertEqual(homebrew_sh_stdout, '')
else:
self.assertEqual(homebrew_sh_stdout, out)
if err is None:
self.assertEqual(homebrew_sh_stderr, '')
else:
self.assertEqual(homebrew_sh_stderr, err)
def test7(self):
test_cmd = 'who | grep hjnam && echo "hjnam is logged in"'
cmd_list = [bash_call, test_cmd]
homebrew_sh_stdout, homebrew_sh_stderr = subprocess_pipe(cmd_list)
naive_sh = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = naive_sh.communicate()
if out is None:
self.assertEqual(homebrew_sh_stdout, '')
else:
self.assertEqual(homebrew_sh_stdout, out)
if err is None:
self.assertEqual(homebrew_sh_stderr, '')
else:
self.assertEqual(homebrew_sh_stderr, err)
def test8(self):
test_cmd = 'who | grep hjnam || echo "hjnam not logged in"'
cmd_list = [bash_call, test_cmd]
homebrew_sh_stdout, homebrew_sh_stderr = subprocess_pipe(cmd_list)
naive_sh = subprocess.Popen(test_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, err = naive_sh.communicate()
if out is None:
self.assertEqual(homebrew_sh_stdout, '')
else:
self.assertEqual(homebrew_sh_stdout, out)
if err is None:
self.assertEqual(homebrew_sh_stderr, '')
else:
self.assertEqual(homebrew_sh_stderr, err)
"""
def test9(self):
test_cmd = 'exit'
cmd_list = [bash_call, test_cmd]
homebrew_sh, _ = subprocess_pipe(cmd_list)
naive_sh = subprocess.check_output(test_cmd, shell=True)
self.assertEqual(homebrew_sh, naive_sh)
"""
if __name__ == '__main__':
TS = unittest.makeSuite(BashTest, "test")
runner = unittest.TextTestRunner()
runner.run(TS)
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <wait.h>
#include <sys/types.h>
#include <signal.h>
#include "command_in.c"
#include "comm.h"
Command* comm2block(int* bn){
char* str;
int token_num, i;
str = _getln();
char** tokens = _strtok_by_space(str, &token_num);
int pipe_count = 0;
for(i = 0; i < token_num; ++i){
if(strcmp(tokens[i], ">") == 0 ||
strcmp(tokens[i], "<") == 0 ||
strcmp(tokens[i], ">>") == 0 ||
strcmp(tokens[i], "|") == 0 ||
strcmp(tokens[i], "&&") == 0 ||
strcmp(tokens[i], "||") == 0 ||
strcmp(tokens[i], ";") == 0 ){
pipe_count += 1;
}
}
int queto_switch = 0;
int start_index;
int j;
for(i = 0; i < token_num; ++i){
if(tokens[i][0] == '\"' && tokens[i][strlen(tokens[i]) - 1] != '\"'){
queto_switch = 1;
start_index = i;
}
else if(tokens[i][0] != '\"' && tokens[i][strlen(tokens[i]) - 1] == '\"'){
for(j = token_num - 1; j > i ;--j){
// tokens[j] = (char *) realloc(tokens[j], sizeof(char) * (strlen(tokens[j]) + 1));
// strcat(tokens[j], " ");
// tokens[j] = tokens[j] + sizeof(char);
}
char * tmp = malloc(sizeof(char) * (strlen(tokens[i]) + 1));
strcpy(tmp, tokens[i]);
strcat(tokens[start_index], " ");
strcat(tokens[start_index], tmp);
tokens[i] = "";
queto_switch = 0;
}
else if(queto_switch == 1){
char * tmp = malloc(sizeof(char) * (strlen(tokens[i]) + 1));
strcpy(tmp, tokens[i]);
strcat(tokens[start_index], " ");
strcat(tokens[start_index], tmp);
tokens[i] = "";
}
}
int args_count = 0;
int args_number_by_block[pipe_count+1];
pipe_count = 0;
for(i = 0; i < token_num; ++i){
if(strcmp(tokens[i], ">") == 0 ||
strcmp(tokens[i], "<") == 0 ||
strcmp(tokens[i], ">>") == 0 ||
strcmp(tokens[i], "|") == 0 ||
strcmp(tokens[i], "&&") == 0 ||
strcmp(tokens[i], "||") == 0 ||
strcmp(tokens[i], ";") == 0){
args_number_by_block[pipe_count] = args_count;
pipe_count += 1;
args_count = 0;
}
else{
if(strcmp(tokens[i], "") != 0)
args_count += 1;
}
}
args_number_by_block[pipe_count] = args_count;
int** args_length_by_block_and_order = malloc((pipe_count+1)*sizeof(int*));
for(i = 0; i < pipe_count+1; ++i)
args_length_by_block_and_order[i] = malloc(args_number_by_block[i]*sizeof(int));
args_count = 0;
pipe_count = 0;
for(i = 0; i < token_num; ++i){
if(strcmp(tokens[i], ">") == 0 ||
strcmp(tokens[i], "<") == 0 ||
strcmp(tokens[i], ">>") == 0 ||
strcmp(tokens[i], "|") == 0 ||
strcmp(tokens[i], "&&") == 0 ||
strcmp(tokens[i], "||") == 0 ||
strcmp(tokens[i], ";") == 0 ){
pipe_count += 1;
args_count = 0;
}
else{
args_length_by_block_and_order[pipe_count][args_count] = strlen(tokens[i]);
args_count += 1;
}
}
char* command_ops[pipe_count + 1];
command_ops[0] = "\0";
int k = 0;
char *** command_block_string = malloc((pipe_count + 1)*sizeof(char**));
for(i = 0; i < pipe_count + 1; ++i){
command_block_string[i] = malloc(args_number_by_block[i]*sizeof(char*));
for(k = 0; k < args_number_by_block[i]; ++k)
command_block_string[i][k] = malloc(args_length_by_block_and_order[i][k]*sizeof(char));
}
char** commands = malloc((args_number_by_block[0])*sizeof(char*));
for(i = 0; i < args_number_by_block[0]; ++i){
commands[i] = malloc(args_length_by_block_and_order[0][i]*sizeof(char));
}
pipe_count = 0;
args_count = 0;
for(i = 0; i < token_num; ++i){
if(strcmp(tokens[i], ">") == 0 ||
strcmp(tokens[i], "<") == 0 ||
strcmp(tokens[i], ">>") == 0 ||
strcmp(tokens[i], "|") == 0 ||
strcmp(tokens[i], "&&") == 0 ||
strcmp(tokens[i], "||") == 0 ||
strcmp(tokens[i], ";") == 0 ){
for(k = 0; k < args_number_by_block[pipe_count]; ++k)
command_block_string[pipe_count][k] = commands[k];
pipe_count += 1;
commands = malloc((args_number_by_block[pipe_count])*sizeof(char*));
for(k = 0; k < args_number_by_block[pipe_count]; ++k){
commands[k] = malloc(args_length_by_block_and_order[pipe_count][k]*sizeof(char));
}
args_count = 0;
command_ops[pipe_count] = tokens[i];
}
else{
commands[args_count] = tokens[i];
args_count += 1;
}
}
for(k = 0; k < args_number_by_block[pipe_count]; ++k)
command_block_string[pipe_count][k] = commands[k];
/*
int j = 0;
for(i = 0; i < pipe_count + 1; ++i){
printf("block %d, Args number by block : %d\n", i, args_number_by_block[i]);
printf("%s\n", command_ops[i]);
for(j = 0; j < args_number_by_block[i]; ++j){
printf("%d %d %s\n", i, j, command_block_string[i][j]);
}
}
*/
Command* com_block;
com_block = malloc((pipe_count + 1)*sizeof(Command));
for(i = 0; i < pipe_count + 1; ++i){
com_block[i].commfile = command_block_string[i];
com_block[i].optype = command_ops[i];
com_block[i].commfile_len = args_number_by_block[i];
}
*bn = pipe_count + 1;
return com_block;
}
#define CHUNK 1
// _getln() : Read string input with memory-efficient dynamic allocation
// IN :
// OUT :
// RETURN : dynamic allocated string input
char* _getln(){
char *line = NULL, *tmp = NULL;
size_t size = 0, index = 0;
int ch = EOF;
while (ch) {
ch = getc(stdin);
/* Check if we need to stop. */
if (ch == EOF || ch == '\n')
ch = 0;
/* Check if we need to expand. */
if (size <= index) {
size += CHUNK;
tmp = realloc(line, size);
if (!tmp) {
free(line);
line = NULL;
break;
}
line = tmp;
}
/* Actually store the thing. */
line[index++] = ch;
}
return line;
}
// You must free the result if result is non-NULL.
char* str_replace(char *orig, char *rep, char *with) {
char *result; // the return string
char *ins; // the next insert point
char *tmp; // varies
int len_rep; // length of rep (the string to remove)
int len_with; // length of with (the string to replace rep with)
int len_front; // distance between rep and end of last rep
int count; // number of replacements
// sanity checks and initialization
if (!orig || !rep)
return NULL;
len_rep = strlen(rep);
if (len_rep == 0)
return NULL; // empty rep causes infinite loop during count
if (!with)
with = "";
len_with = strlen(with);
// count the number of replacements needed
ins = orig;
for (count = 0; tmp = strstr(ins, rep); ++count) {
ins = tmp + len_rep;
}
tmp = result = malloc(strlen(orig) + (len_with - len_rep) * count + 1);
if (!result)
return NULL;
// first time through the loop, all the variable are set correctly
// from here on,
// tmp points to the end of the result string
// ins points to the next occurrence of rep in orig
// orig points to the remainder of orig after "end of rep"
while (count--) {
ins = strstr(orig, rep);
len_front = ins - orig;
tmp = strncpy(tmp, orig, len_front) + len_front;
tmp = strcpy(tmp, with) + len_with;
orig += len_front + len_rep; // move to next "end of rep"
}
strcpy(tmp, orig);
return result;
}
// _strtok_by_space : Return arr of string splited by space with dynamic allocation
// IN : str(char*)
// OUT : len(int*) - length of arr of string
// RETURN : dynamic allocated arr of string
char** _strtok_by_space(char* str, int* len){
//printf("%s\n", str);
char* original_str = malloc(sizeof(char) * strlen(str));
//strcpy(original_str, str);
original_str = str;
// printf("%s\n", original_str);
char* p = strtok(str, " ");
int n_spaces = 0;
char** res = NULL;
int swit = 0, queto_count = 0;
int i;
char* temp, * temp_;
/* split string and append tokens to 'res' */
while (p) {
res = realloc(res, sizeof (char*) * ++n_spaces);
if (res == NULL)
exit(-1); /* memory allocation failed */
res[n_spaces-1] = p;
/*
if(swit == 1){
for(i = 0; i < queto_count; ++i){
printf("!!!!!!!!!!!\n");
p = strtok(original_str, "\"");
p = strtok(NULL, "\"");
p = str + (original_str - p);
}
swit = 0;
}
*/
if(p != NULL){
p = strtok(NULL, " ");
}
}
/* realloc one extra element for the last NULL */
res = realloc(res, sizeof (char*) * (n_spaces + 1));
res[n_spaces] = 0;
*len = n_spaces;
return res;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment