Last active
January 25, 2026 07:45
-
-
Save mdmitry1/8d58e632088e4c46c3a5ed76ae8d340c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import pytest | |
| # Mark all tests in this directory to run forked | |
| def pytest_collection_modifyitems(items): | |
| for item in items: | |
| # Check if test is in this directory | |
| if "z3" in str(item.fspath): | |
| item.add_marker(pytest.mark.forked) | |
| # Configure logging for forked tests | |
| def pytest_configure(config): | |
| config.option.log_cli = True | |
| config.option.log_cli_level = "INFO" | |
| config.option.log_cli_format = "%(message)s" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3.12 | |
| import numpy as np | |
| import onnx | |
| from onnx import numpy_helper | |
| from z3 import * | |
| import hashlib | |
| import json | |
| import logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(message)s' | |
| ) | |
| # Custom JSON encoder for numpy arrays | |
| class NumpyEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| if isinstance(obj, np.integer): | |
| return int(obj) | |
| if isinstance(obj, np.floating): | |
| return float(obj) | |
| return super(NumpyEncoder, self).default(obj) | |
| def onnx2json(rootpath: str = ".", model_file: str = "simple_nn.onnx", json_file: str = "simple_nn.json"): | |
| try: | |
| # Load model | |
| model = onnx.load(f'{rootpath}/{model_file}', load_external_data=False) | |
| # Build readable dictionary | |
| model_dict = { | |
| 'ir_version': model.ir_version, | |
| 'producer_name': model.producer_name, | |
| 'graph_name': model.graph.name, | |
| 'inputs': [], | |
| 'outputs': [], | |
| 'initializers': {}, | |
| 'nodes': [] | |
| } | |
| # Parse inputs | |
| for inp in model.graph.input: | |
| shape = [dim.dim_value for dim in inp.type.tensor_type.shape.dim] | |
| model_dict['inputs'].append({'name': inp.name, 'shape': shape}) | |
| # Parse outputs | |
| for out in model.graph.output: | |
| shape = [dim.dim_value for dim in out.type.tensor_type.shape.dim] | |
| model_dict['outputs'].append({'name': out.name, 'shape': shape}) | |
| # Parse initializers with actual values | |
| for init in model.graph.initializer: | |
| array = numpy_helper.to_array(init) | |
| model_dict['initializers'][init.name] = { | |
| 'shape': list(array.shape), | |
| 'dtype': str(array.dtype), | |
| 'values': array.tolist() # Convert to list for JSON | |
| } | |
| # Parse nodes | |
| for node in model.graph.node: | |
| node_dict = { | |
| 'op_type': node.op_type, | |
| 'inputs': list(node.input), | |
| 'outputs': list(node.output), | |
| 'attributes': {attr.name: str(attr) for attr in node.attribute} | |
| } | |
| model_dict['nodes'].append(node_dict) | |
| # Save as JSON | |
| with open(f'{rootpath}/{json_file}', "w") as f: | |
| json.dump(model_dict, f, indent=2, cls=NumpyEncoder) | |
| logging.info(f"Saved as JSON with readable weights to {rootpath}/{json_file}") | |
| return 0 | |
| except Exception as e: | |
| logging.error(f"ONNX to JSON conversion failed: {e}") | |
| return 1 | |
| def calculate_file_checksum(file_path, algorithm='sha256'): | |
| """ | |
| Compute the hash of a file using the specified algorithm (e.g., 'md5', 'sha1', 'sha256'). | |
| """ | |
| # Create a hash object for the specified algorithm | |
| hash_func = hashlib.new(algorithm) | |
| # Open the file in binary mode ('rb') | |
| try: | |
| with open(file_path, 'rb') as f: | |
| # Read the file in chunks to handle large files efficiently | |
| for chunk in iter(lambda: f.read(4096), b""): | |
| hash_func.update(chunk) | |
| return hash_func.hexdigest() | |
| except FileNotFoundError: | |
| logging.info(f"Error: File not found at {file_path}") | |
| return None | |
| except ValueError: | |
| logging.info(f"Error: Invalid hash algorithm '{algorithm}'") | |
| return None | |
| class ONNXToZ3Converter: | |
| """Convert ONNX model to Z3 constraints for verification""" | |
| def __init__(self, onnx_model_path=None, onnx_model=None): | |
| if onnx_model_path: | |
| self.model = onnx.load(onnx_model_path) | |
| else: | |
| self.model = onnx_model | |
| self.initializers = {} | |
| self.z3_vars = {} | |
| self.var_counter = 0 | |
| # Load all initializers (weights and biases) | |
| for init in self.model.graph.initializer: | |
| self.initializers[init.name] = numpy_helper.to_array(init) | |
| def _get_unique_var_name(self, prefix): | |
| """Generate unique variable name""" | |
| name = f"{prefix}_{self.var_counter}" | |
| self.var_counter += 1 | |
| return name | |
| def create_input_vars(self, input_shape, prefix="x"): | |
| """Create Z3 variables for network inputs""" | |
| vars_list = [] | |
| total_size = int(np.prod(input_shape)) | |
| for i in range(total_size): | |
| vars_list.append(Real(f"{prefix}_{i}")) | |
| return vars_list | |
| def encode_dense_layer(self, inputs, weight_name, bias_name=None): | |
| """Encode a fully connected (dense) layer as Z3 constraints""" | |
| W = self.initializers[weight_name] | |
| b = self.initializers.get(bias_name, np.zeros(W.shape[1])) | |
| # W shape: (input_dim, output_dim) | |
| outputs = [] | |
| for j in range(W.shape[1]): # For each output neuron | |
| expr = sum(float(W[i, j]) * inputs[i] for i in range(W.shape[0])) | |
| expr = expr + float(b[j]) | |
| outputs.append(expr) | |
| return outputs | |
| def encode_conv2d(self, inputs, input_shape, weight_name, bias_name=None, | |
| stride=(1, 1), padding=(0, 0), dilation=(1, 1)): | |
| """ | |
| Encode 2D convolution as Z3 constraints | |
| Args: | |
| inputs: flattened list of Z3 variables | |
| input_shape: (C_in, H_in, W_in) | |
| weight_name: name of conv kernel in initializers | |
| stride, padding, dilation: conv parameters | |
| """ | |
| kernel = self.initializers[weight_name] # Shape: (C_out, C_in, kH, kW) | |
| bias = self.initializers.get(bias_name, np.zeros(kernel.shape[0])) | |
| C_in, H_in, W_in = input_shape | |
| C_out, _, kH, kW = kernel.shape | |
| # Calculate output dimensions | |
| H_out = (H_in + 2 * padding[0] - dilation[0] * (kH - 1) - 1) // stride[0] + 1 | |
| W_out = (W_in + 2 * padding[1] - dilation[1] * (kW - 1) - 1) // stride[1] + 1 | |
| # Reshape inputs to 3D array for easier indexing | |
| input_tensor = np.array(inputs).reshape(C_in, H_in, W_in) | |
| outputs = [] | |
| for c_out in range(C_out): | |
| for h_out in range(H_out): | |
| for w_out in range(W_out): | |
| # Compute convolution for this output position | |
| h_start = h_out * stride[0] - padding[0] | |
| w_start = w_out * stride[1] - padding[1] | |
| conv_sum = float(bias[c_out]) | |
| for c_in in range(C_in): | |
| for kh in range(kH): | |
| for kw in range(kW): | |
| h_in = h_start + kh * dilation[0] | |
| w_in = w_start + kw * dilation[1] | |
| # Check bounds (for padding) | |
| if 0 <= h_in < H_in and 0 <= w_in < W_in: | |
| idx = c_in * H_in * W_in + h_in * W_in + w_in | |
| conv_sum = conv_sum + float(kernel[c_out, c_in, kh, kw]) * inputs[idx] | |
| outputs.append(conv_sum) | |
| return outputs, (C_out, H_out, W_out) | |
| def encode_maxpool2d(self, inputs, input_shape, kernel_size=(2, 2), | |
| stride=None, padding=(0, 0)): | |
| """ | |
| Encode 2D max pooling as Z3 constraints | |
| Args: | |
| inputs: flattened list of Z3 variables | |
| input_shape: (C, H_in, W_in) | |
| kernel_size: (kH, kW) | |
| """ | |
| if stride is None: | |
| stride = kernel_size | |
| C, H_in, W_in = input_shape | |
| kH, kW = kernel_size | |
| H_out = (H_in + 2 * padding[0] - kH) // stride[0] + 1 | |
| W_out = (W_in + 2 * padding[1] - kW) // stride[1] + 1 | |
| input_tensor = np.array(inputs).reshape(C, H_in, W_in) | |
| outputs = [] | |
| constraints = [] | |
| for c in range(C): | |
| for h_out in range(H_out): | |
| for w_out in range(W_out): | |
| h_start = h_out * stride[0] - padding[0] | |
| w_start = w_out * stride[1] - padding[1] | |
| # Collect all values in the pooling window | |
| pool_values = [] | |
| for kh in range(kH): | |
| for kw in range(kW): | |
| h_in = h_start + kh | |
| w_in = w_start + kw | |
| if 0 <= h_in < H_in and 0 <= w_in < W_in: | |
| idx = c * H_in * W_in + h_in * W_in + w_in | |
| pool_values.append(inputs[idx]) | |
| # Create output variable for max | |
| y = Real(self._get_unique_var_name("maxpool")) | |
| # Add constraints: y >= all values AND y equals at least one | |
| for val in pool_values: | |
| constraints.append(y >= val) | |
| constraints.append(Or([y == val for val in pool_values])) | |
| outputs.append(y) | |
| return outputs, (C, H_out, W_out), constraints | |
| def encode_avgpool2d(self, inputs, input_shape, kernel_size=(2, 2), | |
| stride=None, padding=(0, 0)): | |
| """Encode 2D average pooling""" | |
| if stride is None: | |
| stride = kernel_size | |
| C, H_in, W_in = input_shape | |
| kH, kW = kernel_size | |
| H_out = (H_in + 2 * padding[0] - kH) // stride[0] + 1 | |
| W_out = (W_in + 2 * padding[1] - kW) // stride[1] + 1 | |
| input_tensor = np.array(inputs).reshape(C, H_in, W_in) | |
| outputs = [] | |
| for c in range(C): | |
| for h_out in range(H_out): | |
| for w_out in range(W_out): | |
| h_start = h_out * stride[0] - padding[0] | |
| w_start = w_out * stride[1] - padding[1] | |
| pool_sum = 0 | |
| count = 0 | |
| for kh in range(kH): | |
| for kw in range(kW): | |
| h_in = h_start + kh | |
| w_in = w_start + kw | |
| if 0 <= h_in < H_in and 0 <= w_in < W_in: | |
| idx = c * H_in * W_in + h_in * W_in + w_in | |
| pool_sum = pool_sum + inputs[idx] | |
| count += 1 | |
| outputs.append(pool_sum / count) | |
| return outputs, (C, H_out, W_out) | |
| def encode_batchnorm(self, inputs, gamma_name, beta_name, mean_name, var_name, epsilon=1e-5): | |
| """ | |
| Encode batch normalization: y = gamma * (x - mean) / sqrt(var + eps) + beta | |
| """ | |
| gamma = self.initializers[gamma_name] | |
| beta = self.initializers[beta_name] | |
| mean = self.initializers[mean_name] | |
| var = self.initializers[var_name] | |
| outputs = [] | |
| for i, x in enumerate(inputs): | |
| idx = i % len(gamma) # Handle different batch norm shapes | |
| normalized = (x - float(mean[idx])) / np.sqrt(float(var[idx]) + epsilon) | |
| y = float(gamma[idx]) * normalized + float(beta[idx]) | |
| outputs.append(y) | |
| return outputs | |
| def encode_relu(self, inputs): | |
| """Encode ReLU activation: max(0, x)""" | |
| outputs = [] | |
| for i, x in enumerate(inputs): | |
| y = Real(self._get_unique_var_name("relu")) | |
| outputs.append(y) | |
| return outputs | |
| def get_relu_constraints(self, inputs, outputs): | |
| """Generate Z3 constraints for ReLU layer""" | |
| constraints = [] | |
| for x, y in zip(inputs, outputs): | |
| constraints.append(y >= 0) | |
| constraints.append(y >= x) | |
| constraints.append(Or(y == 0, y == x)) | |
| return constraints | |
| def encode_leaky_relu(self, inputs, alpha=0.01): | |
| """Encode Leaky ReLU: y = x if x > 0 else alpha * x""" | |
| outputs = [] | |
| constraints = [] | |
| for x in inputs: | |
| y = Real(self._get_unique_var_name("leaky_relu")) | |
| # y = max(x, alpha * x) | |
| constraints.append(y >= x) | |
| constraints.append(y >= alpha * x) | |
| constraints.append(Or(y == x, y == alpha * x)) | |
| outputs.append(y) | |
| return outputs, constraints | |
| def encode_tanh_approx(self, inputs, num_segments=5): | |
| """ | |
| Encode tanh using piecewise linear approximation | |
| tanh(x) is bounded in [-1, 1] | |
| """ | |
| outputs = [] | |
| for x in inputs: | |
| # Simple 3-piece linear approximation | |
| # tanh(x) ≈ { -1 if x < -2, x if -2 <= x <= 2, 1 if x > 2 } | |
| y = Real(self._get_unique_var_name("tanh")) | |
| # For simplicity, using linear approximation in [-2, 2] | |
| # y ≈ 0.5 * x (clamped to [-1, 1]) | |
| outputs.append(If(x < -2, -1, If(x > 2, 1, 0.5 * x))) | |
| return outputs | |
| def encode_sigmoid_approx(self, inputs): | |
| """Encode sigmoid using piecewise linear approximation""" | |
| outputs = [] | |
| for x in inputs: | |
| # 3-piece approximation: | |
| # sigmoid(x) ≈ { 0 if x < -5, 0.5 + 0.25*x if -5 <= x <= 5, 1 if x > 5 } | |
| y = If(x < -5, 0, If(x > 5, 1, 0.5 + 0.25 * x)) | |
| outputs.append(y) | |
| return outputs | |
| def encode_softmax_approx(self, inputs): | |
| """ | |
| Encode softmax approximation | |
| Note: Exact softmax is very difficult in Z3 due to exponentials | |
| This uses a simplified constraint that outputs sum to 1 and are positive | |
| """ | |
| outputs = [] | |
| constraints = [] | |
| for i in range(len(inputs)): | |
| y = Real(self._get_unique_var_name("softmax")) | |
| constraints.append(y >= 0) | |
| constraints.append(y <= 1) | |
| outputs.append(y) | |
| # Constraint: outputs sum to 1 | |
| constraints.append(sum(outputs) == 1) | |
| # Approximate ordering preservation: if x_i > x_j then softmax_i > softmax_j | |
| for i in range(len(inputs)): | |
| for j in range(i + 1, len(inputs)): | |
| constraints.append(Implies(inputs[i] > inputs[j], outputs[i] > outputs[j])) | |
| return outputs, constraints | |
| def encode_flatten(self, inputs, input_shape): | |
| """Flatten operation (just returns inputs as-is since we work with flat arrays)""" | |
| return inputs, (np.prod(input_shape),) | |
| def encode_dropout(self, inputs): | |
| """Dropout is identity during inference""" | |
| return inputs | |
| def encode_add(self, inputs1, inputs2): | |
| """ | |
| Element-wise addition of two tensors | |
| Supports broadcasting for common cases | |
| """ | |
| # Simple case: same length | |
| if len(inputs1) == len(inputs2): | |
| return [x + y for x, y in zip(inputs1, inputs2)] | |
| # Broadcasting: one is scalar (length 1) | |
| elif len(inputs1) == 1: | |
| return [inputs1[0] + y for y in inputs2] | |
| elif len(inputs2) == 1: | |
| return [x + inputs2[0] for x in inputs1] | |
| # More complex broadcasting | |
| else: | |
| # Assume inputs2 broadcasts to inputs1 shape | |
| outputs = [] | |
| for i, x in enumerate(inputs1): | |
| y = inputs2[i % len(inputs2)] | |
| outputs.append(x + y) | |
| return outputs | |
| def encode_sub(self, inputs1, inputs2): | |
| """Element-wise subtraction""" | |
| if len(inputs1) == len(inputs2): | |
| return [x - y for x, y in zip(inputs1, inputs2)] | |
| elif len(inputs1) == 1: | |
| return [inputs1[0] - y for y in inputs2] | |
| elif len(inputs2) == 1: | |
| return [x - inputs2[0] for x in inputs1] | |
| else: | |
| outputs = [] | |
| for i, x in enumerate(inputs1): | |
| y = inputs2[i % len(inputs2)] | |
| outputs.append(x - y) | |
| return outputs | |
| def encode_mul(self, inputs1, inputs2): | |
| """Element-wise multiplication""" | |
| if len(inputs1) == len(inputs2): | |
| return [x * y for x, y in zip(inputs1, inputs2)] | |
| elif len(inputs1) == 1: | |
| return [inputs1[0] * y for y in inputs2] | |
| elif len(inputs2) == 1: | |
| return [x * inputs2[0] for x in inputs1] | |
| else: | |
| outputs = [] | |
| for i, x in enumerate(inputs1): | |
| y = inputs2[i % len(inputs2)] | |
| outputs.append(x * y) | |
| return outputs | |
| def encode_div(self, inputs1, inputs2): | |
| """Element-wise division""" | |
| if len(inputs1) == len(inputs2): | |
| return [x / y for x, y in zip(inputs1, inputs2)] | |
| elif len(inputs1) == 1: | |
| return [inputs1[0] / y for y in inputs2] | |
| elif len(inputs2) == 1: | |
| return [x / inputs2[0] for x in inputs1] | |
| else: | |
| outputs = [] | |
| for i, x in enumerate(inputs1): | |
| y = inputs2[i % len(inputs2)] | |
| outputs.append(x / y) | |
| return outputs | |
| def encode_concat(self, input_lists, axis=0): | |
| """ | |
| Concatenate multiple tensors along specified axis | |
| For flattened representations, just concatenates the lists | |
| """ | |
| # For simplified flat representation | |
| outputs = [] | |
| for inp_list in input_lists: | |
| outputs.extend(inp_list) | |
| return outputs | |
| def encode_split(self, inputs, num_outputs, axis=0): | |
| """ | |
| Split tensor into multiple outputs | |
| For flat representation, splits evenly | |
| """ | |
| chunk_size = len(inputs) // num_outputs | |
| outputs = [] | |
| for i in range(num_outputs): | |
| start = i * chunk_size | |
| end = start + chunk_size if i < num_outputs - 1 else len(inputs) | |
| outputs.append(inputs[start:end]) | |
| return outputs | |
| def encode_reshape(self, inputs, new_shape): | |
| """ | |
| Reshape operation - returns same variables with new shape metadata | |
| """ | |
| total_size = int(np.prod([s for s in new_shape if s != -1])) | |
| if total_size != len(inputs): | |
| # Handle -1 in shape | |
| new_shape_resolved = [] | |
| for s in new_shape: | |
| if s == -1: | |
| new_shape_resolved.append(len(inputs) // (total_size // np.prod([x for x in new_shape if x > 0]))) | |
| else: | |
| new_shape_resolved.append(s) | |
| new_shape = new_shape_resolved | |
| return inputs, tuple(new_shape) | |
| def encode_transpose(self, inputs, input_shape, perm): | |
| """ | |
| Transpose operation - permute dimensions | |
| """ | |
| # Convert flat inputs to multidimensional indexing | |
| input_array = np.array(inputs).reshape(input_shape) | |
| output_shape = tuple(input_shape[i] for i in perm) | |
| # Create mapping from old to new positions | |
| outputs = [None] * len(inputs) | |
| for old_idx in np.ndindex(input_shape): | |
| new_idx = tuple(old_idx[i] for i in perm) | |
| old_flat = np.ravel_multi_index(old_idx, input_shape) | |
| new_flat = np.ravel_multi_index(new_idx, output_shape) | |
| outputs[new_flat] = inputs[old_flat] | |
| return outputs, output_shape | |
| def encode_pad(self, inputs, input_shape, pads, mode='constant', constant_value=0): | |
| """ | |
| Pad operation - add padding around tensor | |
| pads: [x1_begin, x2_begin, ..., x1_end, x2_end, ...] | |
| """ | |
| ndim = len(input_shape) | |
| pads_begin = pads[:ndim] | |
| pads_end = pads[ndim:] | |
| output_shape = tuple(input_shape[i] + pads_begin[i] + pads_end[i] for i in range(ndim)) | |
| total_output = int(np.prod(output_shape)) | |
| outputs = [] | |
| for out_idx in np.ndindex(output_shape): | |
| # Check if this position is padding | |
| is_padding = False | |
| in_idx = [] | |
| for i, (o, pb, pe) in enumerate(zip(out_idx, pads_begin, pads_end)): | |
| if o < pb or o >= pb + input_shape[i]: | |
| is_padding = True | |
| break | |
| in_idx.append(o - pb) | |
| if is_padding: | |
| if mode == 'constant': | |
| outputs.append(constant_value) | |
| else: | |
| # For other modes, approximate with constant | |
| outputs.append(constant_value) | |
| else: | |
| in_flat = np.ravel_multi_index(tuple(in_idx), input_shape) | |
| outputs.append(inputs[in_flat]) | |
| return outputs, output_shape | |
| def encode_slice(self, inputs, input_shape, starts, ends, axes=None, steps=None): | |
| """ | |
| Slice operation - extract subset of tensor | |
| """ | |
| if axes is None: | |
| axes = list(range(len(input_shape))) | |
| if steps is None: | |
| steps = [1] * len(axes) | |
| # Build a dictionary mapping axis to slice parameters | |
| slice_params = {} | |
| for axis, start, end, step in zip(axes, starts, ends, steps): | |
| slice_params[axis] = (start, end, step) | |
| # Calculate output shape | |
| output_shape = [] | |
| for i, dim_size in enumerate(input_shape): | |
| if i in slice_params: | |
| start, end, step = slice_params[i] | |
| # Handle None and negative indices | |
| if start is None: | |
| start = 0 | |
| if end is None: | |
| end = dim_size | |
| if start < 0: | |
| start = dim_size + start | |
| if end < 0: | |
| end = dim_size + end | |
| # Calculate output size for this dimension | |
| output_shape.append(max(0, (end - start + step - 1) // step)) | |
| else: | |
| # Full dimension | |
| output_shape.append(dim_size) | |
| # Extract sliced elements | |
| output_indices = [] | |
| for in_idx in np.ndindex(input_shape): | |
| include = True | |
| for i, idx in enumerate(in_idx): | |
| if i in slice_params: | |
| start, end, step = slice_params[i] | |
| # Handle None and negative indices | |
| if start is None: | |
| start = 0 | |
| if end is None: | |
| end = input_shape[i] | |
| if start < 0: | |
| start = input_shape[i] + start | |
| if end < 0: | |
| end = input_shape[i] + end | |
| # Check if this index is included in the slice | |
| if idx < start or idx >= end or (idx - start) % step != 0: | |
| include = False | |
| break | |
| if include: | |
| in_flat = np.ravel_multi_index(in_idx, input_shape) | |
| output_indices.append(in_flat) | |
| outputs = [inputs[i] for i in output_indices] | |
| return outputs, tuple(output_shape) | |
| def encode_global_avg_pool(self, inputs, input_shape): | |
| """ | |
| Global average pooling - average over spatial dimensions | |
| input_shape: (C, H, W) -> output: (C,) | |
| """ | |
| C = input_shape[0] | |
| spatial_size = np.prod(input_shape[1:]) | |
| outputs = [] | |
| for c in range(C): | |
| channel_sum = sum(inputs[c * spatial_size + i] for i in range(spatial_size)) | |
| outputs.append(channel_sum / spatial_size) | |
| return outputs, (C,) | |
| def encode_global_max_pool(self, inputs, input_shape): | |
| """ | |
| Global max pooling - max over spatial dimensions | |
| """ | |
| C = input_shape[0] | |
| spatial_size = np.prod(input_shape[1:]) | |
| outputs = [] | |
| constraints = [] | |
| for c in range(C): | |
| channel_values = [inputs[c * spatial_size + i] for i in range(spatial_size)] | |
| y = Real(self._get_unique_var_name("global_maxpool")) | |
| for val in channel_values: | |
| constraints.append(y >= val) | |
| constraints.append(Or([y == val for val in channel_values])) | |
| outputs.append(y) | |
| return outputs, (C,), constraints | |
| def convert_model_auto(self): | |
| """ | |
| Automatically convert ONNX model by traversing the computation graph | |
| This is a more advanced version that reads the ONNX graph structure | |
| """ | |
| solver = Solver() | |
| # Get model input | |
| model_input = self.model.graph.input[0] | |
| input_shape = [dim.dim_value for dim in model_input.type.tensor_type.shape.dim] | |
| # Handle batch dimension | |
| if input_shape[0] == -1 or input_shape[0] == 1: | |
| input_shape = input_shape[1:] | |
| # Create input variables | |
| inputs = self.create_input_vars(input_shape, "input") | |
| self.z3_vars['input'] = inputs | |
| logging.info(f"Model input shape: {input_shape}") | |
| logging.info(f"Number of nodes in graph: {len(self.model.graph.node)}") | |
| # This is a simplified version - full implementation would traverse the graph | |
| logging.info("\nNote: Full automatic conversion requires parsing ONNX node operations.") | |
| logging.info("For now, use the manual conversion methods for specific architectures.") | |
| return solver, inputs | |
| def convert_simple_network(self, input_dim, hidden_dims, output_dim): | |
| """ | |
| Convert a simple feedforward network to Z3 constraints | |
| """ | |
| solver = Solver() | |
| # Create input variables | |
| inputs = self.create_input_vars([input_dim], "input") | |
| self.z3_vars['input'] = inputs | |
| current_layer = inputs | |
| layer_idx = 0 | |
| # Process hidden layers | |
| for hidden_dim in hidden_dims: | |
| # Dense layer | |
| dense_out = self.encode_dense_layer( | |
| current_layer, | |
| f"weight_{layer_idx}", | |
| f"bias_{layer_idx}" | |
| ) | |
| # ReLU activation | |
| relu_vars = self.encode_relu(dense_out) | |
| relu_constraints = self.get_relu_constraints(dense_out, relu_vars) | |
| solver.add(relu_constraints) | |
| self.z3_vars[f'layer_{layer_idx}_relu'] = relu_vars | |
| current_layer = relu_vars | |
| layer_idx += 1 | |
| # Output layer | |
| outputs = self.encode_dense_layer( | |
| current_layer, | |
| f"weight_{layer_idx}", | |
| f"bias_{layer_idx}" | |
| ) | |
| self.z3_vars['output'] = outputs | |
| return solver, inputs, outputs | |
| def convert_cnn_network(self, input_shape, conv_configs, fc_configs): | |
| """ | |
| Convert a CNN network to Z3 constraints | |
| Args: | |
| input_shape: (C, H, W) | |
| conv_configs: list of dicts with conv layer params | |
| fc_configs: list of (input_dim, output_dim) for FC layers | |
| """ | |
| solver = Solver() | |
| # Create input variables | |
| inputs = self.create_input_vars(input_shape, "input") | |
| self.z3_vars['input'] = inputs | |
| current_layer = inputs | |
| current_shape = input_shape | |
| # Process convolutional layers | |
| for i, config in enumerate(conv_configs): | |
| logging.info(f"Processing conv layer {i}, input shape: {current_shape}") | |
| # Convolution | |
| conv_out, out_shape = self.encode_conv2d( | |
| current_layer, current_shape, | |
| config['weight'], config.get('bias'), | |
| config.get('stride', (1, 1)), | |
| config.get('padding', (0, 0)) | |
| ) | |
| # Activation | |
| if config.get('activation') == 'relu': | |
| relu_vars = self.encode_relu(conv_out) | |
| relu_constraints = self.get_relu_constraints(conv_out, relu_vars) | |
| solver.add(relu_constraints) | |
| current_layer = relu_vars | |
| else: | |
| current_layer = conv_out | |
| current_shape = out_shape | |
| # Pooling | |
| if 'pool' in config: | |
| if config['pool'] == 'max': | |
| pool_out, pool_shape, pool_constraints = self.encode_maxpool2d( | |
| current_layer, current_shape, | |
| config.get('pool_size', (2, 2)) | |
| ) | |
| solver.add(pool_constraints) | |
| current_layer = pool_out | |
| current_shape = pool_shape | |
| # Flatten | |
| current_layer, _ = self.encode_flatten(current_layer, current_shape) | |
| # Process fully connected layers | |
| for i, (in_dim, out_dim) in enumerate(fc_configs): | |
| dense_out = self.encode_dense_layer( | |
| current_layer, | |
| f"fc_weight_{i}", | |
| f"fc_bias_{i}" | |
| ) | |
| if i < len(fc_configs) - 1: # Not the last layer | |
| relu_vars = self.encode_relu(dense_out) | |
| relu_constraints = self.get_relu_constraints(dense_out, relu_vars) | |
| solver.add(relu_constraints) | |
| current_layer = relu_vars | |
| else: | |
| current_layer = dense_out | |
| self.z3_vars['output'] = current_layer | |
| return solver, inputs, current_layer | |
| def convert_resnet_block(self, inputs, input_shape, config): | |
| """ | |
| Convert a ResNet-style residual block | |
| Args: | |
| inputs: input Z3 variables | |
| input_shape: current shape | |
| config: dict with conv configs for the block | |
| Returns: | |
| outputs, output_shape, constraints | |
| """ | |
| constraints = [] | |
| # Main path | |
| main_path = inputs | |
| main_shape = input_shape | |
| for conv_cfg in config['main_path']: | |
| # Convolution | |
| conv_out, conv_shape = self.encode_conv2d( | |
| main_path, main_shape, | |
| conv_cfg['weight'], conv_cfg.get('bias'), | |
| conv_cfg.get('stride', (1, 1)), | |
| conv_cfg.get('padding', (0, 0)) | |
| ) | |
| # Batch norm (if specified) | |
| if 'batchnorm' in conv_cfg: | |
| bn_cfg = conv_cfg['batchnorm'] | |
| conv_out = self.encode_batchnorm( | |
| conv_out, bn_cfg['gamma'], bn_cfg['beta'], | |
| bn_cfg['mean'], bn_cfg['var'] | |
| ) | |
| # Activation (if not last layer in block) | |
| if conv_cfg.get('activation') == 'relu': | |
| relu_vars = self.encode_relu(conv_out) | |
| relu_constraints = self.get_relu_constraints(conv_out, relu_vars) | |
| constraints.extend(relu_constraints) | |
| main_path = relu_vars | |
| else: | |
| main_path = conv_out | |
| main_shape = conv_shape | |
| # Shortcut/skip connection | |
| if 'shortcut' in config: | |
| shortcut_cfg = config['shortcut'] | |
| if shortcut_cfg.get('type') == 'conv': | |
| # 1x1 convolution for dimension matching | |
| shortcut, shortcut_shape = self.encode_conv2d( | |
| inputs, input_shape, | |
| shortcut_cfg['weight'], shortcut_cfg.get('bias'), | |
| shortcut_cfg.get('stride', (1, 1)), | |
| (0, 0) | |
| ) | |
| elif shortcut_cfg.get('type') == 'pool': | |
| # Pooling for dimension matching | |
| shortcut, shortcut_shape, pool_constraints = self.encode_maxpool2d( | |
| inputs, input_shape, | |
| shortcut_cfg.get('pool_size', (2, 2)), | |
| shortcut_cfg.get('stride', (2, 2)) | |
| ) | |
| constraints.extend(pool_constraints) | |
| else: | |
| # Identity shortcut | |
| shortcut = inputs | |
| shortcut_shape = input_shape | |
| # Add: main_path + shortcut | |
| outputs = self.encode_add(main_path, shortcut) | |
| # Final ReLU | |
| if config.get('final_activation') == 'relu': | |
| relu_vars = self.encode_relu(outputs) | |
| relu_constraints = self.get_relu_constraints(outputs, relu_vars) | |
| constraints.extend(relu_constraints) | |
| outputs = relu_vars | |
| return outputs, main_shape, constraints | |
| def save_solver_to_file(self, solver, filename): | |
| """ | |
| Save Z3 solver constraints to SMT-LIB2 format file | |
| Args: | |
| solver: Z3 Solver object | |
| filename: output file path | |
| """ | |
| with open(filename, 'w') as f: | |
| f.write(solver.to_smt2()) | |
| logging.info(f"Saved solver constraints to: {filename}") | |
| def save_model_info(self, filename, input_vars, output_vars, solver): | |
| """ | |
| Save human-readable model information | |
| Args: | |
| filename: output file path | |
| input_vars: list of input Z3 variables | |
| output_vars: list of output Z3 variables | |
| solver: Z3 Solver object | |
| """ | |
| with open(filename, 'w') as f: | |
| f.write("="*70 + "\n") | |
| f.write("ONNX to Z3 Conversion Summary\n") | |
| f.write("="*70 + "\n\n") | |
| f.write(f"Number of input variables: {len(input_vars)}\n") | |
| f.write(f"Input variables: {[str(v) for v in input_vars[:10]]}") | |
| if len(input_vars) > 10: | |
| f.write(f" ... ({len(input_vars)-10} more)") | |
| f.write("\n\n") | |
| f.write(f"Number of output variables: {len(output_vars)}\n") | |
| f.write(f"Output variables: {[str(v) for v in output_vars]}\n\n") | |
| f.write(f"Number of constraints: {len(solver.assertions())}\n\n") | |
| f.write("First 10 constraints:\n") | |
| for i, assertion in enumerate(solver.assertions()[:10]): | |
| f.write(f" {i+1}. {assertion}\n") | |
| if len(solver.assertions()) > 10: | |
| f.write(f" ... ({len(solver.assertions())-10} more constraints)\n") | |
| f.write("\n" + "="*70 + "\n") | |
| logging.info(f"Saved model info to: {filename}") | |
| def create_example_onnx_model(): | |
| """Create a simple ONNX model for demonstration""" | |
| import onnx.helper as helper | |
| # Define tensors | |
| input_tensor = helper.make_tensor_value_info('input', onnx.TensorProto.FLOAT, [1, 2]) | |
| output_tensor = helper.make_tensor_value_info('output', onnx.TensorProto.FLOAT, [1, 1]) | |
| # Create weights | |
| W0 = numpy_helper.from_array(np.random.randn(2, 4).astype(np.float32), 'weight_0') | |
| b0 = numpy_helper.from_array(np.random.randn(4).astype(np.float32), 'bias_0') | |
| W1 = numpy_helper.from_array(np.random.randn(4, 3).astype(np.float32), 'weight_1') | |
| b1 = numpy_helper.from_array(np.random.randn(3).astype(np.float32), 'bias_1') | |
| W2 = numpy_helper.from_array(np.random.randn(3, 1).astype(np.float32), 'weight_2') | |
| b2 = numpy_helper.from_array(np.random.randn(1).astype(np.float32), 'bias_2') | |
| # Create nodes for the computation graph | |
| nodes = [ | |
| helper.make_node('MatMul', ['input', 'weight_0'], ['matmul_0']), | |
| helper.make_node('Add', ['matmul_0', 'bias_0'], ['layer_0']), | |
| helper.make_node('Relu', ['layer_0'], ['relu_0']), | |
| helper.make_node('MatMul', ['relu_0', 'weight_1'], ['matmul_1']), | |
| helper.make_node('Add', ['matmul_1', 'bias_1'], ['layer_1']), | |
| helper.make_node('Relu', ['layer_1'], ['relu_1']), | |
| helper.make_node('MatMul', ['relu_1', 'weight_2'], ['matmul_2']), | |
| helper.make_node('Add', ['matmul_2', 'bias_2'], ['output']), | |
| ] | |
| graph_def = helper.make_graph( | |
| nodes=nodes, # Add the nodes! | |
| name='SimpleNN', | |
| inputs=[input_tensor], | |
| outputs=[output_tensor], | |
| initializer=[W0, b0, W1, b1, W2, b2] | |
| ) | |
| model_def = helper.make_model(graph_def, producer_name='onnx-example') | |
| return model_def | |
| def create_example_cnn_model(): | |
| """Create a simple CNN model for demonstration""" | |
| import onnx.helper as helper | |
| input_tensor = helper.make_tensor_value_info('input', onnx.TensorProto.FLOAT, [1, 1, 28, 28]) | |
| output_tensor = helper.make_tensor_value_info('output', onnx.TensorProto.FLOAT, [1, 10]) | |
| conv_w = numpy_helper.from_array(np.random.randn(8, 1, 3, 3).astype(np.float32), 'conv_weight_0') | |
| conv_b = numpy_helper.from_array(np.random.randn(8).astype(np.float32), 'conv_bias_0') | |
| fc_w = numpy_helper.from_array(np.random.randn(8*13*13, 10).astype(np.float32), 'fc_weight_0') | |
| fc_b = numpy_helper.from_array(np.random.randn(10).astype(np.float32), 'fc_bias_0') | |
| graph_def = helper.make_graph( | |
| nodes=[], | |
| name='SimpleCNN', | |
| inputs=[input_tensor], | |
| outputs=[output_tensor], | |
| initializer=[conv_w, conv_b, fc_w, fc_b] | |
| ) | |
| model_def = helper.make_model(graph_def, producer_name='onnx-example') | |
| return model_def | |
| def demonstrate_resnet_block(rootpath="."): | |
| """Demonstrate ResNet block conversion""" | |
| logging.info("\n[Example 3] ResNet Block with Skip Connection") | |
| logging.info("-" * 70) | |
| import onnx.helper as helper | |
| input_tensor = helper.make_tensor_value_info('input', onnx.TensorProto.FLOAT, [1, 16, 8, 8]) | |
| output_tensor = helper.make_tensor_value_info('output', onnx.TensorProto.FLOAT, [1, 16, 8, 8]) | |
| conv1_w = numpy_helper.from_array(np.random.randn(16, 16, 3, 3).astype(np.float32), 'res_conv1_w') | |
| conv1_b = numpy_helper.from_array(np.random.randn(16).astype(np.float32), 'res_conv1_b') | |
| bn1_gamma = numpy_helper.from_array(np.ones(16).astype(np.float32), 'res_bn1_gamma') | |
| bn1_beta = numpy_helper.from_array(np.zeros(16).astype(np.float32), 'res_bn1_beta') | |
| bn1_mean = numpy_helper.from_array(np.zeros(16).astype(np.float32), 'res_bn1_mean') | |
| bn1_var = numpy_helper.from_array(np.ones(16).astype(np.float32), 'res_bn1_var') | |
| conv2_w = numpy_helper.from_array(np.random.randn(16, 16, 3, 3).astype(np.float32), 'res_conv2_w') | |
| conv2_b = numpy_helper.from_array(np.random.randn(16).astype(np.float32), 'res_conv2_b') | |
| bn2_gamma = numpy_helper.from_array(np.ones(16).astype(np.float32), 'res_bn2_gamma') | |
| bn2_beta = numpy_helper.from_array(np.zeros(16).astype(np.float32), 'res_bn2_beta') | |
| bn2_mean = numpy_helper.from_array(np.zeros(16).astype(np.float32), 'res_bn2_mean') | |
| bn2_var = numpy_helper.from_array(np.ones(16).astype(np.float32), 'res_bn2_var') | |
| graph_def = helper.make_graph( | |
| nodes=[], | |
| name='ResNetBlock', | |
| inputs=[input_tensor], | |
| outputs=[output_tensor], | |
| initializer=[conv1_w, conv1_b, bn1_gamma, bn1_beta, bn1_mean, bn1_var, | |
| conv2_w, conv2_b, bn2_gamma, bn2_beta, bn2_mean, bn2_var] | |
| ) | |
| model_def = helper.make_model(graph_def, producer_name='resnet-example') | |
| onnx.save(model_def, f'{rootpath}/resnet_block.onnx') | |
| converter = ONNXToZ3Converter(f'{rootpath}/resnet_block.onnx') | |
| resnet_config = { | |
| 'main_path': [ | |
| { | |
| 'weight': 'res_conv1_w', | |
| 'bias': 'res_conv1_b', | |
| 'padding': (1, 1), | |
| 'batchnorm': { | |
| 'gamma': 'res_bn1_gamma', | |
| 'beta': 'res_bn1_beta', | |
| 'mean': 'res_bn1_mean', | |
| 'var': 'res_bn1_var' | |
| }, | |
| 'activation': 'relu' | |
| }, | |
| { | |
| 'weight': 'res_conv2_w', | |
| 'bias': 'res_conv2_b', | |
| 'padding': (1, 1), | |
| 'batchnorm': { | |
| 'gamma': 'res_bn2_gamma', | |
| 'beta': 'res_bn2_beta', | |
| 'mean': 'res_bn2_mean', | |
| 'var': 'res_bn2_var' | |
| } | |
| } | |
| ], | |
| 'final_activation': 'relu' | |
| } | |
| inputs = converter.create_input_vars((16, 8, 8), "resnet_input") | |
| outputs, out_shape, constraints = converter.convert_resnet_block( | |
| inputs, (16, 8, 8), resnet_config | |
| ) | |
| logging.info(f"Input shape: (16, 8, 8) = {16*8*8} variables") | |
| logging.info(f"Output shape: {out_shape} = {len(outputs)} variables") | |
| logging.info(f"Constraints generated: {len(constraints)}") | |
| logging.info("ResNet block structure:") | |
| logging.info(" Input -> Conv3x3 -> BN -> ReLU -> Conv3x3 -> BN -> (+) -> ReLU -> Output") | |
| logging.info(" | ^") | |
| logging.info(" +------------ Identity Shortcut ----------+") | |
| def demonstrate_element_wise_ops(rootpath="."): | |
| """Demonstrate element-wise operations""" | |
| logging.info("\n[Example 4] Element-wise Operations (Add, Mul, Concat)") | |
| logging.info("-" * 70) | |
| import onnx.helper as helper | |
| input_tensor = helper.make_tensor_value_info('input', onnx.TensorProto.FLOAT, [1, 4]) | |
| output_tensor = helper.make_tensor_value_info('output', onnx.TensorProto.FLOAT, [1, 8]) | |
| w1 = numpy_helper.from_array(np.random.randn(4, 4).astype(np.float32), 'w1') | |
| b1 = numpy_helper.from_array(np.random.randn(4).astype(np.float32), 'b1') | |
| w2 = numpy_helper.from_array(np.random.randn(4, 4).astype(np.float32), 'w2') | |
| b2 = numpy_helper.from_array(np.random.randn(4).astype(np.float32), 'b2') | |
| graph_def = helper.make_graph( | |
| nodes=[], | |
| name='MultiPath', | |
| inputs=[input_tensor], | |
| outputs=[output_tensor], | |
| initializer=[w1, b1, w2, b2] | |
| ) | |
| model_def = helper.make_model(graph_def, producer_name='multipath-example') | |
| onnx.save(model_def, f'{rootpath}/multipath.onnx') | |
| converter = ONNXToZ3Converter(f'{rootpath}/multipath.onnx') | |
| inputs = converter.create_input_vars([4], "mp_input") | |
| branch1 = converter.encode_dense_layer(inputs, 'w1', 'b1') | |
| branch1_relu = converter.encode_relu(branch1) | |
| branch2 = converter.encode_dense_layer(inputs, 'w2', 'b2') | |
| branch2_relu = converter.encode_relu(branch2) | |
| multiplied = converter.encode_mul(branch1, branch2) | |
| added = converter.encode_add(branch1_relu, branch2_relu) | |
| concatenated = converter.encode_concat([branch1_relu, branch2_relu]) | |
| logging.info(f"Input: 4 variables") | |
| logging.info(f"Branch 1 (FC->ReLU): {len(branch1_relu)} variables") | |
| logging.info(f"Branch 2 (FC->ReLU): {len(branch2_relu)} variables") | |
| logging.info(f"Element-wise multiply: {len(multiplied)} variables") | |
| logging.info(f"Element-wise add: {len(added)} variables") | |
| logging.info(f"Concatenated output: {len(concatenated)} variables") | |
| logging.info("\nNetwork structure:") | |
| logging.info(" +--> FC -> ReLU --+") | |
| logging.info(" | |") | |
| logging.info(" Input --> -+ +-> Concat -> Output") | |
| logging.info(" | |") | |
| logging.info(" +--> FC -> ReLU --+") | |
| def demonstrate_advanced_ops(): | |
| """Demonstrate advanced operations""" | |
| logging.info("\n[Example 5] Advanced Operations (Transpose, Slice, Pad)") | |
| logging.info("-" * 70) | |
| converter = ONNXToZ3Converter(onnx_model=create_example_onnx_model()) | |
| inputs = [Real(f"x_{i}") for i in range(12)] | |
| input_shape = (3, 4) | |
| logging.info("\n1. Transpose (3, 4) -> (4, 3)") | |
| transposed, trans_shape = converter.encode_transpose(inputs, input_shape, perm=[1, 0]) | |
| logging.info(f" Output shape: {trans_shape}") | |
| logging.info("\n2. Reshape (3, 4) -> (2, 6)") | |
| reshaped, reshape_out = converter.encode_reshape(inputs, (2, 6)) | |
| logging.info(f" Output shape: {reshape_out}") | |
| logging.info("\n3. Slice: extract first 2 channels") | |
| sliced, slice_shape = converter.encode_slice(inputs, input_shape, starts=[0], ends=[2], axes=[0]) | |
| logging.info(f" Output shape: {slice_shape}, {len(sliced)} elements") | |
| logging.info("\n4. Pad (3, 4) with 1 zero on each side") | |
| padded, pad_shape = converter.encode_pad(inputs, input_shape, pads=[1, 1, 1, 1], mode='constant', constant_value=0) | |
| logging.info(f" Output shape: {pad_shape}, {len(padded)} elements") | |
| logging.info("\n5. Global Average Pooling (2, 4, 6) -> (2,)") | |
| img_inputs = [Real(f"img_{i}") for i in range(48)] | |
| global_avg, gavg_shape = converter.encode_global_avg_pool(img_inputs, (2, 4, 6)) | |
| logging.info(f" Output shape: {gavg_shape}, {len(global_avg)} elements") | |
| def test_adversarial_robustness(rootpath="."): | |
| """Test: Find adversarial example for a simple classifier""" | |
| logging.info("\n[Test 1] Adversarial Robustness Verification") | |
| logging.info("-" * 70) | |
| logging.info("Task: Find small perturbation that changes classification") | |
| # Create a simple 2D classifier: 2 inputs -> 2 outputs (binary classification) | |
| import onnx.helper as helper | |
| input_tensor = helper.make_tensor_value_info('input', onnx.TensorProto.FLOAT, [1, 2]) | |
| output_tensor = helper.make_tensor_value_info('output', onnx.TensorProto.FLOAT, [1, 2]) | |
| # Create a classifier that separates points by line y = x | |
| # Class 0 if x0 > x1, Class 1 if x1 > x0 | |
| W = np.array([[1.0, -1.0], [-1.0, 1.0]], dtype=np.float32) | |
| b = np.array([0.0, 0.0], dtype=np.float32) | |
| w_init = numpy_helper.from_array(W, 'w') | |
| b_init = numpy_helper.from_array(b, 'b') | |
| graph_def = helper.make_graph( | |
| nodes=[], | |
| name='SimpleClassifier', | |
| inputs=[input_tensor], | |
| outputs=[output_tensor], | |
| initializer=[w_init, b_init] | |
| ) | |
| model_def = helper.make_model(graph_def, producer_name='test-classifier') | |
| onnx.save(model_def, f'{rootpath}/test_classifier.onnx') | |
| converter = ONNXToZ3Converter(f'{rootpath}/test_classifier.onnx') | |
| solver = Solver() | |
| # Original input point (should classify as class 0) | |
| x0_orig = 2.0 | |
| x1_orig = 1.0 | |
| # Create Z3 variables for perturbed input | |
| x0 = Real('x0') | |
| x1 = Real('x1') | |
| inputs = [x0, x1] | |
| # Encode the network | |
| outputs = converter.encode_dense_layer(inputs, 'w', 'b') | |
| # Constraint: small perturbation (L-infinity norm <= 0.5) | |
| epsilon = 0.5 | |
| solver.add(x0 >= x0_orig - epsilon, x0 <= x0_orig + epsilon) | |
| solver.add(x1 >= x1_orig - epsilon, x1 <= x1_orig + epsilon) | |
| # Original classification: output[0] > output[1] (class 0) | |
| # We want adversarial: output[1] > output[0] (class 1) | |
| solver.add(outputs[1] > outputs[0]) | |
| logging.info(f"Original point: ({x0_orig}, {x1_orig})") | |
| logging.info(f"Original scores: class0={W[0,0]*x0_orig + W[0,1]*x1_orig + b[0]:.2f}, class1={W[1,0]*x0_orig + W[1,1]*x1_orig + b[1]:.2f}") | |
| logging.info(f"Perturbation budget: epsilon = {epsilon}") | |
| logging.info("\nSearching for adversarial example...") | |
| result = solver.check() | |
| if result == sat: | |
| m = solver.model() | |
| x0_adv = float(m[x0].as_decimal(3).rstrip('?')) | |
| x1_adv = float(m[x1].as_decimal(3).rstrip('?')) | |
| score0 = W[0,0]*x0_adv + W[0,1]*x1_adv + b[0] | |
| score1 = W[1,0]*x0_adv + W[1,1]*x1_adv + b[1] | |
| logging.info(f"✓ Adversarial example found!") | |
| logging.info(f" Perturbed point: ({x0_adv:.3f}, {x1_adv:.3f})") | |
| logging.info(f" Perturbation: ({x0_adv - x0_orig:.3f}, {x1_adv - x1_orig:.3f})") | |
| logging.info(f" New scores: class0={score0:.2f}, class1={score1:.2f}") | |
| logging.info(f" Classification changed: class 0 -> class 1") | |
| converter.save_solver_to_file(solver, f'{rootpath}/adversarial_solver.smt2') | |
| converter.save_model_info(f'{rootpath}/adversarial_info.txt', inputs, outputs, solver) | |
| return 1 | |
| else: | |
| logging.info("✗ No adversarial example found within epsilon bound") | |
| return 0 | |
| def test_property_verification(rootpath="."): | |
| """Test: Verify that network output stays within bounds""" | |
| logging.info("\n[Test 2] Property Verification") | |
| logging.info("-" * 70) | |
| logging.info("Task: Prove output is always positive for positive inputs") | |
| import onnx.helper as helper | |
| input_tensor = helper.make_tensor_value_info('input', onnx.TensorProto.FLOAT, [1, 2]) | |
| output_tensor = helper.make_tensor_value_info('output', onnx.TensorProto.FLOAT, [1, 1]) | |
| # Network: Linear -> ReLU -> Linear (always positive output for positive weights) | |
| W1 = np.array([[1.0, 1.0], [1.0, 1.0]], dtype=np.float32) | |
| b1 = np.array([0.0, 0.0], dtype=np.float32) | |
| W2 = np.array([[1.0], [1.0]], dtype=np.float32) | |
| b2 = np.array([0.0], dtype=np.float32) | |
| graph_def = helper.make_graph( | |
| nodes=[], | |
| name='PositiveNet', | |
| inputs=[input_tensor], | |
| outputs=[output_tensor], | |
| initializer=[ | |
| numpy_helper.from_array(W1, 'w1'), | |
| numpy_helper.from_array(b1, 'b1'), | |
| numpy_helper.from_array(W2, 'w2'), | |
| numpy_helper.from_array(b2, 'b2') | |
| ] | |
| ) | |
| model_def = helper.make_model(graph_def, producer_name='positive-net') | |
| onnx.save(model_def, f'{rootpath}/positive_net.onnx') | |
| converter = ONNXToZ3Converter(f'{rootpath}/positive_net.onnx') | |
| solver = Solver() | |
| # Create inputs | |
| x0 = Real('x0') | |
| x1 = Real('x1') | |
| inputs = [x0, x1] | |
| # Encode network | |
| layer1 = converter.encode_dense_layer(inputs, 'w1', 'b1') | |
| layer1_relu = converter.encode_relu(layer1) | |
| relu_constraints = converter.get_relu_constraints(layer1, layer1_relu) | |
| solver.add(relu_constraints) | |
| output = converter.encode_dense_layer(layer1_relu, 'w2', 'b2') | |
| # Property: inputs are positive | |
| solver.add(x0 >= 0, x1 >= 0) | |
| solver.add(x0 <= 10, x1 <= 10) # Bounded domain | |
| # Try to find counterexample: output is negative | |
| solver.add(output[0] < 0) | |
| logging.info("Property to verify: output >= 0 for all inputs in [0, 10] x [0, 10]") | |
| logging.info("Searching for counterexample (output < 0)...") | |
| result = solver.check() | |
| if result == unsat: | |
| logging.info("✓ Property VERIFIED: Output is always non-negative!") | |
| logging.info(" No counterexample exists.") | |
| # Save the verification problem | |
| converter.save_solver_to_file(solver, f'{rootpath}/property_verification_solver.smt2') | |
| converter.save_model_info(f'{rootpath}/property_verification_info.txt', inputs, output, solver) | |
| return 0 | |
| else: | |
| logging.info("✗ Property VIOLATED: Found counterexample") | |
| m = solver.model() | |
| logging.info(f" Input: ({m[x0]}, {m[x1]})") | |
| logging.info(f" Output: {m.evaluate(output[0])}") | |
| return 1 | |
| def test_input_bounds(rootpath="."): | |
| """Test: Find valid inputs that produce desired output""" | |
| logging.info("\n[Test 3] Input Synthesis") | |
| logging.info("-" * 70) | |
| logging.info("Task: Find inputs that produce output in target range [5, 6]") | |
| import onnx.helper as helper | |
| input_tensor = helper.make_tensor_value_info('input', onnx.TensorProto.FLOAT, [1, 2]) | |
| output_tensor = helper.make_tensor_value_info('output', onnx.TensorProto.FLOAT, [1, 1]) | |
| # Simple network: weighted sum | |
| W = np.array([[2.0], [3.0]], dtype=np.float32) | |
| b = np.array([1.0], dtype=np.float32) | |
| graph_def = helper.make_graph( | |
| nodes=[], | |
| name='SumNet', | |
| inputs=[input_tensor], | |
| outputs=[output_tensor], | |
| initializer=[ | |
| numpy_helper.from_array(W, 'w'), | |
| numpy_helper.from_array(b, 'b') | |
| ] | |
| ) | |
| model_def = helper.make_model(graph_def, producer_name='sum-net') | |
| onnx.save(model_def, f'{rootpath}/sum_net.onnx') | |
| converter = ONNXToZ3Converter(f'{rootpath}/sum_net.onnx') | |
| solver = Solver() | |
| x0 = Real('x0') | |
| x1 = Real('x1') | |
| inputs = [x0, x1] | |
| output = converter.encode_dense_layer(inputs, 'w', 'b') | |
| # Constraints: bounded inputs | |
| solver.add(x0 >= 0, x0 <= 5) | |
| solver.add(x1 >= 0, x1 <= 5) | |
| # Target: output in [5, 6] | |
| solver.add(output[0] >= 5) | |
| solver.add(output[0] <= 6) | |
| logging.info("Network: output = 2*x0 + 3*x1 + 1") | |
| logging.info("Target output range: [5, 6]") | |
| logging.info("Input constraints: x0, x1 in [0, 5]") | |
| logging.info("\nSearching for valid inputs...") | |
| result = solver.check() | |
| if result == sat: | |
| m = solver.model() | |
| x0_val = float(m[x0].as_decimal(3).rstrip('?')) | |
| x1_val = float(m[x1].as_decimal(3).rstrip('?')) | |
| output_val = 2*x0_val + 3*x1_val + 1 | |
| logging.info(f"✓ Solution found!") | |
| logging.info(f" Input: x0={x0_val:.3f}, x1={x1_val:.3f}") | |
| logging.info(f" Output: {output_val:.3f}") | |
| logging.info(f" Verification: 2*{x0_val:.3f} + 3*{x1_val:.3f} + 1 = {output_val:.3f}") | |
| # Save the input synthesis problem | |
| converter.save_solver_to_file(solver, f'{rootpath}/input_synthesis_solver.smt2') | |
| converter.save_model_info(f'{rootpath}/input_synthesis_info.txt', inputs, output, solver) | |
| return 0 | |
| else: | |
| logging.info("✗ No solution found") | |
| return 1 | |
| def run_all_tests(rootpath="."): | |
| """Run all verification tests""" | |
| logging.info("\n" + "="*70) | |
| logging.info("RUNNING VERIFICATION TESTS") | |
| logging.info("="*70) | |
| rc = [test_adversarial_robustness(rootpath), test_property_verification(rootpath), test_input_bounds(rootpath)] | |
| logging.info("\n" + "="*70) | |
| logging.info(f"ALL TESTS COMPLETED with return codes {rc}") | |
| logging.info("\n" + "="*70) | |
| return "\n".join([str(rc[0]),str(rc[1]),str(rc[2])]) | |
| def calculate_checksums(rootpath="."): | |
| files=['multipath.onnx','positive_net.onnx','resnet_block.onnx','simple_cnn.onnx', | |
| 'simple_nn.onnx', 'sum_net.onnx', 'test_classifier.onnx', | |
| 'input_synthesis_solver.smt2', 'input_synthesis_info.txt', | |
| 'cnn_solver.smt2', 'cnn_info.txt', | |
| 'property_verification_solver.smt2', 'property_verification_info.txt', | |
| 'simple_nn_solver.smt2', 'simple_nn_info.txt', 'simple_nn.json'] | |
| check_sums=[] | |
| for file in files: | |
| check_sums.append(''.join([file,"=",calculate_file_checksum(f"{rootpath}/{file}")])) | |
| return "\n".join(check_sums) | |
| def main(rootpath="."): | |
| np.random.seed(42) | |
| logging.info("="*70) | |
| logging.info("ONNX to Z3 Converter - Extended Version") | |
| logging.info("="*70) | |
| # Example 1: Simple feedforward network | |
| logging.info("\n[Example 1] Simple Feedforward Network") | |
| logging.info("-" * 70) | |
| model = create_example_onnx_model() | |
| onnx.save(model, f'{rootpath}/simple_nn.onnx') | |
| error_code = onnx2json(rootpath, "simple_nn.onnx","simple_nn.json") | |
| if(error_code): | |
| logging.error("ERROR: onnx2json conversion failed") | |
| converter = ONNXToZ3Converter(f'{rootpath}/simple_nn.onnx') | |
| solver, inputs, outputs = converter.convert_simple_network( | |
| input_dim=2, | |
| hidden_dims=[4, 3], | |
| output_dim=1 | |
| ) | |
| logging.info(f"Created {len(inputs)} input variables") | |
| logging.info(f"Created {len(outputs)} output variables") | |
| logging.info(f"Solver has {len(solver.assertions())} constraints") | |
| converter.save_solver_to_file(solver, f'{rootpath}/simple_nn_solver.smt2') | |
| converter.save_model_info(f'{rootpath}/simple_nn_info.txt', inputs, outputs, solver) | |
| logging.info("\nVerification: Can output exceed 10 with input in [-1, 1]?") | |
| for inp in inputs: | |
| solver.add(inp >= -1, inp <= 1) | |
| solver.add(outputs[0] > 10) | |
| result = solver.check() | |
| logging.info(f"Result: {result}") | |
| if result == sat: | |
| m = solver.model() | |
| logging.info(f" Counterexample found!") | |
| logging.info(f" Input: {[m.evaluate(inp) for inp in inputs]}") | |
| logging.info(f" Output: {m.evaluate(outputs[0])}") | |
| # Example 2: CNN network | |
| logging.info("\n[Example 2] CNN Network") | |
| logging.info("-" * 70) | |
| cnn_model = create_example_cnn_model() | |
| onnx.save(cnn_model, f'{rootpath}/simple_cnn.onnx') | |
| cnn_converter = ONNXToZ3Converter(f'{rootpath}/simple_cnn.onnx') | |
| small_conv_configs = [ | |
| { | |
| 'weight': 'conv_weight_0', | |
| 'bias': 'conv_bias_0', | |
| 'stride': (1, 1), | |
| 'padding': (0, 0), | |
| 'activation': 'relu' | |
| } | |
| ] | |
| cnn_converter.initializers['fc_weight_0'] = np.random.randn(8*6*6, 10).astype(np.float32) | |
| logging.info("Creating CNN with:") | |
| logging.info(" Input: 1x8x8") | |
| logging.info(" Conv: 3x3, 8 filters -> ReLU") | |
| logging.info(" Flatten -> FC(10)") | |
| cnn_solver, cnn_inputs, cnn_outputs = cnn_converter.convert_cnn_network( | |
| input_shape=(1, 8, 8), | |
| conv_configs=small_conv_configs, | |
| fc_configs=[(8*6*6, 10)] | |
| ) | |
| logging.info(f"\nCreated {len(cnn_inputs)} input variables") | |
| logging.info(f"Created {len(cnn_outputs)} output variables") | |
| logging.info(f"Solver has {len(cnn_solver.assertions())} constraints") | |
| # Save CNN solver | |
| cnn_converter.save_solver_to_file(cnn_solver, f'{rootpath}/cnn_solver.smt2') | |
| cnn_converter.save_model_info(f'{rootpath}/cnn_info.txt', cnn_inputs, cnn_outputs, cnn_solver) | |
| # Run additional examples | |
| demonstrate_resnet_block(rootpath) | |
| demonstrate_element_wise_ops(rootpath) | |
| demonstrate_advanced_ops() | |
| logging.info("\n" + "="*70) | |
| logging.info("COMPLETE OPERATION SUPPORT:") | |
| logging.info("="*70) | |
| logging.info("\nCONVOLUTIONAL LAYERS:") | |
| logging.info(" ✓ Conv2D (with stride, padding, dilation)") | |
| logging.info(" ✓ MaxPool2D, AvgPool2D") | |
| logging.info(" ✓ Global Average/Max Pooling") | |
| logging.info("\nNORMALIZATION:") | |
| logging.info(" ✓ Batch Normalization") | |
| logging.info("\nACTIVATIONS:") | |
| logging.info(" ✓ ReLU (exact)") | |
| logging.info(" ✓ LeakyReLU (exact)") | |
| logging.info(" ✓ Sigmoid (piecewise linear approx)") | |
| logging.info(" ✓ Tanh (piecewise linear approx)") | |
| logging.info(" ✓ Softmax (constraint-based approx)") | |
| logging.info("\nELEMENT-WISE OPERATIONS:") | |
| logging.info(" ✓ Add, Subtract") | |
| logging.info(" ✓ Multiply, Divide") | |
| logging.info("\nTENSOR OPERATIONS:") | |
| logging.info(" ✓ Concat, Split") | |
| logging.info(" ✓ Reshape, Transpose") | |
| logging.info(" ✓ Slice, Pad") | |
| logging.info(" ✓ Flatten") | |
| logging.info("\nARCHITECTURES:") | |
| logging.info(" ✓ Feedforward Networks") | |
| logging.info(" ✓ CNNs") | |
| logging.info(" ✓ ResNet Blocks (with skip connections)") | |
| logging.info(" ✓ Multi-path networks") | |
| logging.info("\nVERIFICATION USE CASES:") | |
| logging.info(" • Adversarial robustness") | |
| logging.info(" • Property verification") | |
| logging.info(" • Input-output constraints") | |
| logging.info(" • Safety-critical validation") | |
| logging.info("="*70) | |
| results = run_all_tests(rootpath) + "\n" + calculate_checksums(rootpath) | |
| logging.info(results) | |
| return hashlib.sha256(results.encode()).hexdigest() | |
| if __name__ == "__main__": | |
| from sys import argv | |
| print(main()) if len(argv) < 2 else print(main(argv[1])) | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/python3.12 | |
| import sys | |
| from os import remove, popen | |
| from os.path import exists, realpath, dirname | |
| from os import getenv | |
| from pytest import mark | |
| @mark.skipif(sys.version_info >= (3, 14), | |
| reason="Skipping ONNX tests for Python 3.14") | |
| def test_onnx2z3(monkeypatch, request): | |
| from onnx2z3_ex import main | |
| root_dir = str(request.config.rootpath) + '/' | |
| with monkeypatch.context() as m: | |
| test_path = dirname(realpath(root_dir + getenv('PYTEST_CURRENT_TEST').split(':')[0])) | |
| onnx_models = ["multipath.onnx", "positive_net.onnx", "resnet_block.onnx", | |
| "simple_cnn.onnx", "simple_nn.onnx", "sum_net.onnx", "test_classifier.onnx", | |
| 'input_synthesis_solver.smt2', 'input_synthesis_info.txt', | |
| 'cnn_solver.smt2', 'cnn_info.txt', | |
| 'property_verification_solver.smt2', 'property_verification_info.txt', | |
| 'simple_nn_solver.smt2', 'simple_nn_info.txt'] | |
| for onnx_model in onnx_models: | |
| model_full_path=f"{test_path}/onnx_model" | |
| if exists(model_full_path): | |
| remove(model_full_path) | |
| assert exists(model_full_path) == False | |
| print("") | |
| m.setattr(sys, 'argv', ['onnx2z3_ex']) | |
| result = main(test_path) | |
| assert result == 'b6e972f84b42785c434f290916f329a51825b029cf1a6c4480f8d0df308d0d5e' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment