""" Examples of use: Check whether two versions of pre-test are different in total scores UEDRandomization.py UED_interview_data.csv prompt_pre Lizard Fish total Check for sig of pre to post change in systematic variation with randomization across all prompts UEDRandomization.py UED_interview_data.csv none none none systvar prepost false Check for sig of pre to post change in replication scores with randomization only within prompts UEDRandomization.py UED_interview_data.csv none none none replication prepost true Check for sig of pre to post change in total scores for LC treatment UEDRandomization.py UED_interview_data.csv treatment LC none total prepost false Check for sig of pre to post change between WF and NF treatments UEDRandomization.py UED_interview_data.csv treatment WF NF total prepostdiff false """ from __future__ import print_function import sys import re import csv import random import math # The input file is a csv file with the following columnds indices = {"treatment": 0, "login": 1, "student_id": 2, "transcript_id": 3, "systvar_pre": 4, "systvar_post": 5, "addhyp_pre": 6, "addhyp_post": 7, "replication_pre": 8, "replication_post": 9, "duration_pre": 10, "duration_post": 11, "pcvs_pre": 12, "pcvs_post": 13, "dv_pre": 14, "dv_post": 15, "prompt_pre": 16, "prompt_post": 17, "total_pre": 18, "total_post": 19, "edct_pre": 20, "edct_post": 21, "e1bestexp": 22, "e1bestbio": 23, "e1bestmatchhyp": 24, "e1bestnummanip": 25, "e2firstexp": 26, "e2firstbio": 27, "e2firstmatchhyp": 28, "e2firstnummanip": 29, "dec_pcv_pre":30, "dec_pcv_post":31, "dec_dv_pre":32, "dec_dv_post":33, "conc_goal_pre": 34, "conc_goal_post": 35, "conc_cntrl_pre": 36, "conc_cntrl_post": 37, "conc_pcv_pre": 38, "conc_pcv_post": 39, "conc_dur_pre": 40, "conc_dur_post": 41, "dec_total_pre": 42, "dec_total_post": 43, "conc_total_pre": 44, "conc_total_post": 45, "calculated_value": 46 # used if the randomization should be on a calculated var (like post-pre) } num_randomizations = 10000 # Number of times to randomize and recalculate statistic def loadDataFile(filename): """ Load the file with the student data """ data = {} with open(filename, 'r') as csvfile: # creating a csv reader object csvreader = csv.reader(csvfile) # Get rid of first line. Assume order of columns is as given in the input_file_indices above next(csvreader) for row in csvreader: id = row[indices["student_id"]] data[id] = row return data def calculateDiff(data, independent_var_index, ind_val1, ind_val2, measure_index): """ Assumes there are two sets of values being compared with names ind_val1 and ind_val2 in the independent_var_index column returns the signed difference of avg(ind_val2) - avg(ind_val1) """ total1 = 0 # Total of all values tagged with sort_val1 total2 = 0 # Total of all values tagged with sort_val2 num_values1 = 0 num_values2 = 0 for subject in data: # Go through all the participants sort_val = data[subject][indices[independent_var_index]] # Find the value in the column on which we are sorting measure_val = float(data[subject][indices[measure_index]]) # Find the dependent variable value if(sort_val == ind_val1): total1 += measure_val num_values1 += 1 elif(sort_val == ind_val2): total2 += measure_val num_values2 += 1 else: # Not necessarily an error if there are more than 2 values in the sort column print("Error - sort value not recognized: " + sort_val + " ", end=" ") # Calculate means mean1 = total1/float(num_values1) mean2 = total2/float(num_values2) # Calculate the difference between the averages mean_diff = mean2 - mean1 # Calculate variance (s^2) for each sample total1 = 0 # Total of all values tagged with sort_val1 total2 = 0 # Total of all values tagged with sort_val2 num_values1 = 0 num_values2 = 0 for subject in data: # Go through all the participants sort_val = data[subject][indices[independent_var_index]] # Find the value in the column on which we are sorting measure_val = float(data[subject][indices[measure_index]]) # Find the dependent variable value if(sort_val == ind_val1): total1 += (measure_val - mean1) ** 2 num_values1 += 1 elif(sort_val == ind_val2): total2 += (measure_val - mean2) ** 2 num_values2 += 1 #else: # Not necessarily an error if there are more than 2 values in the sort column var1 = total1 / float(num_values1 - 1) var2 = total2 / float(num_values2 - 1) # Calculate the t-statistic pooled_var = (var1 * (num_values1 - 1) + var2 * (num_values2 - 1)) / (num_values1 + num_values2 - 2) # pooled var t_stat = (mean2 - mean1) / (pooled_var * math.sqrt(1.0/num_values1 + 1.0/num_values2)) # Two methods for calculating difference. First is simply difference between the means. Second is # calculate t-statistic, which includes variance. # Here comment out the one that don't want. Note that t-stat is not tested as well as diff as of this writing # return mean_diff # Difference in means return abs(t_stat) # Absolute value of t-stat (taking absolute makes p-value one-sided, so p=0.05 threshold happens if <5% of values are > real difference def calculatePrePost(data, independent_var_index, value_of_interest, compare_index_pre, compare_index_post): """ Calculate the avg post-pre test difference for the sort_val group in the sort_index column. The compare indices are the indices for the data columns with the pre and post data in them. """ total = 0 num_values = 0 for subject in data: # if data exists for subject both pre and post if(data[subject][indices[compare_index_post]] != "N/A" and data[subject][indices[compare_index_pre]] != "N/A"): diff = int(data[subject][indices[compare_index_post]]) - int(data[subject][indices[compare_index_pre]]) if(independent_var_index is None): total += diff num_values += 1 else: ind_val = data[subject][indices[independent_var_index]] if(value_of_interest == "None" or ind_val == value_of_interest): total += diff num_values += 1 return total / float(num_values) def randomizeData(data, columns, independent_var, values_of_interest, randomize_within_prompts): """ Shuffling algorithm drawn from here: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle#The_modern_algorithm If independent_var_index is not None, only randomizes subjects whose independent_var value is within values_of_interest Randomizing within prompts, when true, doesn't mix scores between the two version of the assessment - i.e. Lizard scores randomized with other lizard scores, and similarly for Fish. This is a control for any effects of prompt on results. This is written specifically for the alternate EDAT test so assume that Lizard and Fish are the two choices, and which one is which is given in the prompt_pre and prompt_post columns. In order to do this, the first column in 'columns' is assumed to be the pre and the second the post.""" # Make copy of data data2 = data.copy() data_list_lizard = [] # If randomize_with_prompts is false, this will contain all the data data_list_fish = [] # Put all the values into one list for subject in data: if(independent_var is not None): if(data[subject][indices[independent_var]] not in values_of_interest): continue c = 0 # c = 0 means its the pre column, c = 1 means its the post column # bit of a hack, but lots faster than looking for "pre" or "post" in the column name for column in columns: if(randomize_within_prompts): # If the test from which this score came was the Lizard test if((c == 0 and data[subject][indices['prompt_pre']] == "Lizard") or (c == 1 and data[subject][indices['prompt_post']] == "Lizard")): data_list_lizard.append(data[subject][indices[column]]) else: data_list_fish.append(data[subject][indices[column]]) c += 1 else: # If ignoring test prompt, just use the lizard list and not the fish list data_list_lizard.append(data[subject][indices[column]]) # go through each data cell to be randomized and switch that data point with another point # do separately for lizard and fish lists num_lizard = len(data_list_lizard) for i in range(0, num_lizard-1): j = random.randint(i, num_lizard-1) temp = data_list_lizard[i] data_list_lizard[i] = data_list_lizard[j] data_list_lizard[j] = temp num_fish = len(data_list_fish) for i in range(0, num_fish-1): j = random.randint(i, num_fish-1) temp = data_list_fish[i] data_list_fish[i] = data_list_fish[j] data_list_fish[j] = temp # Put the randomized values back into the data dictionary and return i_lizard = 0 i_fish = 0 for subject in data2: if(independent_var is not None): if(data[subject][indices[independent_var]] not in values_of_interest): continue c = 0 # as above for column in columns: if(randomize_within_prompts): # If the test to which this score is assigned was the Lizard test if((c == 0 and data[subject][indices['prompt_pre']] == "Lizard") or (c == 1 and data[subject][indices['prompt_post']] == "Lizard")): data2[subject][indices[column]] = data_list_lizard[i_lizard] i_lizard += 1 else: data2[subject][indices[column]] = data_list_fish[i_fish] i_fish += 1 c += 1 else: # If ignoring test prompt, just use the lizard list and not the fish list data2[subject][indices[column]] = data_list_lizard[i_lizard] i_lizard += 1 return data2 def randomizeDataWithinSubject(data, columns): """ Randomize pre/post scores for the columns of interest within the subject - in other words, for any given student, take their pre and post scores and randomly put them back in the pre and post slots for that student. This may be a more valid way of testing whether post scores differ from pre scores as it doesn't remove the influence of individual students skills, but tests the null that the students skill didn't change with the treatment.""" # Make copy of data data2 = data.copy() for subject in data: data_list = [] for column in columns: data_list.append(data[subject][indices[column]]) # go through each data cell to be randomized and switch that data point with another point # do separately for lizard and fish lists num = len(data_list) for i in range(0, num-1): j = random.randint(i, num-1) temp = data_list[i] data_list[i] = data_list[j] data_list[j] = temp # Put the randomized values back into the subjects record i = 0 for column in columns: data2[subject][indices[column]] = data_list[i] i += 1 return data2 def main(): """ The main program block :return: nothing """ input_filename = sys.argv[1] data = loadDataFile(input_filename) if(len(sys.argv) < 4): print ("Need more arguments. \n input_file.csv independent_var ind_val1 ind_val2 comparison_var [prepost|prepostdiff] [randomize_within_prompts]") # The independent variable that is being compared independent_var = sys.argv[2] ind_val1 = None ind_val2 = None if(independent_var.lower() == "none"): independent_var = None # Use all data, for instance to test pre-post across all treatments else: ind_val1 = sys.argv[3] # If comparing pre-post significance for a single treatment condition, ind_val1 is the condition ind_val2 = sys.argv[4] # If comparing between two values of an independent variable, these are the two values being compared # The comparison variable is the dependent variable - i.e. some measurement of student success comparison = sys.argv[5] compare_index_pre = comparison + "_pre" compare_index_post = comparison + "_post" # The following two are mutually exclusive randomize_across_pre_post = False # if true, randomizing values across both pre and post test columns. # Use for testing if a pre to post difference is real randomize_pre_post_change = False # if true, randomizing the post-pre values for each subject. Use if # comparing two treatments and want to know if the change is different between # treatments. In this case, it randomizes the changes rather than individual # pre/post scores if(len(sys.argv) > 6): if(sys.argv[6] == "prepost"): randomize_across_pre_post = True elif(sys.argv[6] == "prepostdiff"): randomize_pre_post_change = True randomize_within_prompts = False # If true, randomization occurs only within Lizard and Fish prompts, not across those prompts # (i.e. Lizard scores randomized with other Lizard scores) if(len(sys.argv) > 7): if(sys.argv[7].lower() == "true"): randomize_within_prompts = True # if randomizing the change in score rather than the scores, make a new column of data that contains # the change data we are interested in so then using a single column of data in randomizing if(randomize_pre_post_change): for subject in data: data[subject].append(0) # Add a spot in the data array for the calculated value val = float(data[subject][indices[compare_index_post]]) - float(data[subject][indices[compare_index_pre]]) data[subject][indices['calculated_value']] = val comparison = 'calculated_value' # The new dependent value if(randomize_across_pre_post): real_diff = abs(calculatePrePost(data, independent_var, ind_val1, compare_index_pre, compare_index_post)) less_than = 0 more_than = 0 for i in range(0,num_randomizations): diff = abs(calculatePrePost(randomizeDataWithinSubject(data, [compare_index_pre, compare_index_post]), None, None, compare_index_pre, compare_index_post)) # diff = calculatePrePost(randomizeData(data, [compare_index_pre, compare_index_post], independent_var, [ind_val1, ind_val2], randomize_within_prompts), None, None, compare_index_pre, compare_index_post) if(diff < real_diff): less_than += 1 elif(diff > real_diff): more_than += 1 else: real_diff = abs(calculateDiff(data, independent_var, ind_val1, ind_val2, comparison)) less_than = 0 more_than = 0 for i in range(0,num_randomizations): diff = abs(calculateDiff(randomizeData(data, [comparison], independent_var, [ind_val1, ind_val2], randomize_within_prompts), independent_var, ind_val1, ind_val2, comparison)) if(diff < real_diff): less_than += 1 elif(diff > real_diff): more_than += 1 if(independent_var is not None): print("Independent var: " + independent_var + " comparing " + ind_val1 + " and " + ind_val2) print("Dependent var: " + comparison) print ("Real diff = " + str(real_diff) + ": random diff less " + str(less_than / float(num_randomizations) * 100) + \ "% : random diff more " + str(more_than / float(num_randomizations) * 100) + "%" + " : random diff equal " + str((num_randomizations - more_than - less_than) / float(num_randomizations) * 100) + "%") main() def randomizeDataWithReplacement (data, columns, randomize_within_prompts): """ Randomizes the data with replacement, so each randomized point is selected from all original data points Randomizing within prompts, when true, doesn't mix scores between the two version of the assessment - i.e. Lizard scores randomized with other lizard scores, and similarly for Fish. This is a control for any effects of prompt on results. This is written specifically for the alternate EDAT test so assume that Lizard and Fish are the two choices, and which one is which is given in the prompt_pre and prompt_post columns. In order to do this, the first column in 'columns' is assumed to be the pre and the second the post.""" # Make copy of data data2 = data.copy() data_list_lizard = [] # If randomize_with_prompts is false, this will contain all the data data_list_fish = [] # Put all the values into one list for subject in data: for column in columns: if(randomize_within_prompts): # If the test from which this score came was the Lizard test if((column == 0 and data[subject][indices['prompt_pre']] == "Lizard") or (column == 1 and data[subject][indices['prompt_post']] == "Lizard")): data_list_lizard.append(data[subject][indices[column]]) else: data_list_fish.append(data[subject][indices[column]]) else: # If ignoring test prompt, just use the lizard list and not the fish list data_list_lizard.append(data[subject][indices[column]]) # Put the randomized values back into the data dictionary and return. Each new value is drawn from all # original values so one original value can be used multiple times in randomized data (i.e. drawing with replacement) num_lizard = len(data_list_lizard) num_fish = len(data_list_fish) for subject in data2: for column in columns: if(randomize_within_prompts): # If the test to which this score is assigned was the Lizard test if((column == 0 and data[subject][indices['prompt_pre']] == "Lizard") or (column == 1 and data[subject][indices['prompt_post']] == "Lizard")): data2[subject][indices[column]] = data_list_lizard[random.randint(0, num_lizard-1)] else: data2[subject][indices[column]] = data_list_fish[random.randint(0, num_fish-1)] else: # If ignoring test prompt, just use the lizard list and not the fish list data2[subject][indices[column]] = data_list_lizard[random.randint(0, num_lizard-1)] return data2