Taking a byte out of U-SQL

I recently had a requirement to combine a set of approx. 100 CSV files into a single file and encountered an interesting problem along the way.

The files all had the same simple structure which was 2 columns.  I thought this would make a pretty straight forward exercise – until I started processing the files.

I decided to use U-SQL to complete the task as I figured the amount of code required would be pretty lightweight and would just run the task locally on my machine using Visual Studio.  I created a U-SQL project and added a U-SQL item.

Here is what the U-SQL looked like for the first pass:

DECLARE @in = "/myfiles/{name}.csv";
DECLARE @out = "/result.csv";

@input =
    EXTRACT status string,
            pages string,
            name string
    FROM @in
    USING Extractors.Csv(skipFirstNRows:1);

OUTPUT @input
TO @out
USING  Outputters.Csv(outputHeader : true);

Pretty standard U-SQL – however when running the job I encountered the following error :

Execution failed with error '1_SV1_Extract Error : '
	{
	"diagnosticCode":195887140,
	"severity":"Error",
	"component":"RUNTIME",
	"source":"User",
	"errorId":"E_RUNTIME_USER_EXTRACT_ROW_ERROR",
	"message":"Error occured while extracting row after processing 227 record(s) in the vertex' input split. Column index: 1, column name: 'pages'.",
	"description":"",
	"resolution":"",
	"helpLink":"",
	"details":"",
	"internalDiagnostics":"",
	"innerError":
		{
		"diagnosticCode":195887137,
		"severity":"Error",
		"component":"RUNTIME",
		"source":"User",
		"errorId":"E_RUNTIME_USER_EXTRACT_COLUMN_CONVERSION_TOO_LONG",
		"message":"Value too long failure when attempting to convert column data.",
		"description":"Can not convert string to proper type. The resulting data length is too long."

The error wasn’t too helpful but I figured there were rows in some of the tables that were causing issues.  So I added the silent:true parameter to the Extractor function thinking this might help me by ignoring the corrupt rows and at least get me get the good rows from the source CSV files.

DECLARE @in = "/myfiles/{name}.csv";
DECLARE @out = "/result.csv";

@input =
    EXTRACT status string,
            pages string,
            name string
    FROM @in
    USING Extractors.Csv(skipFirstNRows:1, silent:true);

OUTPUT @input
TO @out
USING  Outputters.Csv(outputHeader : true);

This now produced a different error:

message":"Row size exceeds the maximum allowed size of 4194304 bytes","description":

It seemed some of the rows in my CSV files exceeded an upper limit on how much the Extractor.Csv function can handle and adding the silent:true  parameter didn’t solve the issue.

I dug a bit deeper and found rows in some of the files that are long –  really long.  One in particular was 47MB long just for the row and this was valid data.  I could have manually edited these outs by hand but thought I’d see if I could solve another way.

After some internet research and a couple of helpful tweets to and from Michael Rys, I decided to have a go at making my own custom U-SQL extractor.

The final extractor code is below.  There is a fair amount of customisation in my example which suits my particular requirement, and is by no means meant to provide a generic custom extractor to suit a wider variety of situations, but it might help if you have similar issues with minor tweaks:

The essence is the row that has 47MB is just too large for most C# SPLIT functions – well certainly none that I found.

My solution is to create a custom extractor that reads through the input files 1 byte at a time.  The extractor begins at the start of the file and reads every byte until the file using the following C# function:

input.BaseStream.ReadByte()

I wrap the function in a while loop and ReadByte returns an ASCII value for the current byte which allows me to test if the value is a delimiter or data I should add to my string value.

I maintain two strings per row (one per column) and every time ReadByte() encounters the row delimiter, I reset the string values and return the row to U-SQL using the output.AsReadOnly() function.

I only care about the first 1024 bytes per column so stop storing the data from the input file, but continue to read until the next column or row delimiter.  This is how I avoid overflow errors.

It’s pretty quick too. 🙂

Here is the code behind that you can paste into a script.usql.cs file.

using Microsoft.Analytics.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;

namespace CustomExtractor
{


[SqlUserDefinedExtractor(AtomicFileProcessing = false)]
    public class PhilExtract : IExtractor
    {
        private readonly Encoding _encoding;
        private byte[] _row_delim;
        private byte[] _col_delim;

        public PhilExtract(Encoding encoding, string row_delim = "\n", string col_delim = ",")
        {
            this._encoding = ((encoding == null) ? Encoding.UTF8 : encoding);
            this._row_delim = this._encoding.GetBytes(row_delim);
            this._col_delim = this._encoding.GetBytes(col_delim);

        }
        public override IEnumerable<IRow> Extract(IUnstructuredReader input, IUpdatableRow output)
        {
            //int t = 0;

            string col1 = "";
            int MaxValue = 1024;
            byte[] bcol1 = new byte[MaxValue];
            int bytePosInFile = 0;
            int rowCounter = 0;
            int colCounter = 0;
            int col = 0;

            while (bytePosInFile >= 0)
            {
                bytePosInFile = input.BaseStream.ReadByte();
                if (bytePosInFile == _row_delim[0] || bytePosInFile == -1 ) // newline

                {

                    if (rowCounter > 0) {
                        col1 = System.Text.Encoding.UTF8.GetString(bcol1, 0, colCounter).Replace("\"", "");
                        if (col < 2)
                        {
                            output.Set<string>(col, col1);
                        }
                        yield return output.AsReadOnly();
                    }
                    Array.Clear(bcol1, 0, MaxValue);
                    colCounter = 0;
                    rowCounter++;
                    col = 0;
 
                }

                else if (bytePosInFile == _col_delim[0] && col < 2 && rowCounter > 0)
                {
                    
                    col1 = System.Text.Encoding.UTF8.GetString(bcol1, 0, colCounter).Replace("\"", "");
                    output.Set<string>(col, col1);
                    Array.Clear(bcol1, 0, MaxValue);
                    colCounter = 0;
                    col++;
                }
                else if (colCounter < MaxValue)
                {
                    bcol1[colCounter] = Convert.ToByte(bytePosInFile);
                    colCounter++;
                }
            }
            yield break;
        }
    }
}

 

The USQL to call the above function is

DECLARE @in = "/myfiles/{name}.csv";
DECLARE @out = "/result.csv";

@input =
    EXTRACT status string,
            pages string,
            name string
    FROM @in
    USING new CustomExtractor.PhilExtract(Encoding.UTF8, "\n" , "," );

OUTPUT @input
TO @out
USING  Outputters.Csv(outputHeader : true);

This worked a treat.  This was my first crack at a custom U-SQL extractor but shows the level of flexibility available over the standard extractors .  I’d expect standard extractors to improve overtime but if this helps you in anyway that’s great.  I’d be keen to hear if it does.

Note the virtual column of {name} still works as expected and adds a third column to my output file which in this case, carries the name of the input file the row originated from.

Philip Seamark on EmailPhilip Seamark on LinkedinPhilip Seamark on Twitter
Philip Seamark
Phil is Microsoft Data Platform MVP and an experienced database and business intelligence (BI) professional with a deep knowledge of the Microsoft B.I. stack along with extensive knowledge of data warehouse (DW) methodologies and enterprise data modelling. He has 25+ years experience in this field and an active member of Power BI community.

Leave a Reply