<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">import gstlearn as gl
import gstlearn.document as gdoc
import numpy as np
import scipy.optimize
import matplotlib.pyplot as plt
import gstlearn.plot as gp
import urllib.request
#import tempfile

urlOise = "https://soft.minesparis.psl.eu/gstlearn/oise_SPDE"

def downloadFile(filename, verbose=False):
    try:
        url = urlOise + "/" + filename
        fullname, head = urllib.request.urlretrieve(url)
        if (verbose):
            print(url, " successfully download!")
        return fullname
    except:
        print("Error: Cannot access ", url)
        pass
    return fullname

def extendSel(grid,name,newname = None,n=0):
    if newname is None:
        newname = name
    vmin = 0.5
    vmax = 1.5
    nxy = grid.getNXs()
    image2 = gl.BImage(nxy)
    tab = grid.getColumn(name,useSel=False)
    image = gl.morpho_double2image(nxy,tab,vmin,vmax)
    localVD = gl.VectorDouble(len(tab))
    gl.morpho_dilation(0, [n,n], image, image2)
    gl.morpho_image2double(image2, 0, 1., 0., localVD)
    grid[newname] = localVD.getVector()
    return localVD


def createOiseData(selvario = False):
    print("Create data")
    print(selvario)
    filename = downloadFile("Oise_Data_ThicknessSides.ascii")
    #filename = gdoc.loadData("Alluvial", "Oise_Data_ThicknessSides.ascii",version = "1.6.0")
    data = gl.Db.createFromNF(filename)
    data.clearLocators(gl.ELoc.L)
    data.clearLocators(gl.ELoc.U)
    data["error"] = 0 * data["rank"] + 0.1
    data.setLocator("error",gl.ELoc.V)
    data.clearLocators(gl.ELoc.SEL)
    if selvario:
        data["selVario"] = data["ThicknessSides"] != 0
        data.setLocator("selVario",gl.ELoc.SEL)
    data.setLocators(["ThicknessSides"],gl.ELoc.Z,cleanSameLocator=True)
    data.display()
    return data

def createOiseGrid(nborder=0):
    filename = downloadFile("Oise_GridAnglesModifFinal.ascii")
    #filename = gdoc.loadData("Alluvial", "Oise_GridAnglesModifFinal.ascii",version = "1.6.0")

    grid = gl.DbGrid.createFromNF(filename)
    grid.setLocator("Polygon.*", gl.ELoc.SEL)
    grid.deleteColumns(["spde*"])
    #grid["ranges0"] = 0 * grid["rank"] + 100.
    #grid["ranges1"] = 0 * grid["rank"] + 100.
    grid["ranges0"] = 0 * grid["rank"] + 200.
    grid["ranges1"] = 0 * grid["rank"] + 200.
    return grid

def makeOiseModel(grid = None,filemodel="",nborder = 0,stationary = False, drift = 1):
    if filemodel != "":
        model = gl.Model.createFromNF(filemodel)
    else:
        ranges = [8000,800]
        model = gl.Model.createFromParam(gl.ECov.MATERN,param = 1, ranges = ranges,angles = [10,0])
    if grid is not None and not stationary:
        model.getCovAniso(0).makeAngleNoStatDb("Migrate.angles_interp",0,grid)
    else : 
        model.getCovAniso(0).setAnisoAngle(0,45)
    #model.getCova(0).makeRangeNoStatDb("ranges0",0,grid)
    #model.getCova(0).makeRangeNoStatDb("ranges1",1,grid)
    model.setDriftIRF(drift)
    return model

def splitTrainTestFromInd(testind,data,a = None):
    if a is None:
        a = np.arange(data.getNSample())
    trainind = np.setdiff1d(a,testind)
    train = gl.Db.createReduce(data,ranks = trainind)
    test =gl.Db.createReduce(data,ranks=testind)
    return train, test
    

def splitTrainTest(data,ratiotest=0.1,seed = 13124, a = None):
    if ratiotest == 0:
        print("No test set")
        return data, None
    if a is None:
        a = np.arange(data.getNSample())
    np.random.seed(seed)
    np.random.shuffle(a)
    testind = a[range(int(data.getNSample()*ratiotest))]
    return splitTrainTestFromInd(testind,data,a)
    


class optimLikelihood :
    def __init__(self,
                 dat  , 
                 model,
                 grid       = None,
                 spde       = True,
                 flag_angle = False,
                 a0m        = 0.1,  a0M = 500,    #Variance totale
                 a1m        = 0.01, a1M = 0.99,   #Proportion erreur mesure
                 a2m        = 10  , a2M = 20000,  #Range 1
                 a3m        = 10  , a3M = 20000): #Range 2
           
        self.spde = spde
        #ParamÃ©trisation 1 (affichage)
        #newparam[0] = variance de l'erreur de mesure
        #newparam[1] = sill du Matern
        #newparam[2] et newparam[3] = ranges

        #ParamÃ©trisation 2 (pour dÃ©finir facilement les contraintes)
        #param[0] = variance totale
        #param[1] = proportion de variance associÃ©e Ã&nbsp; l'erreur de mesure
        #param[2] et param[3] ranges 

        #min et max des paramÃ¨tres (paramÃ©trisation 2)
        self.a0m = a0m
        self.a0M = a0M
        self.a1m = a1m
        self.a1M = a1M
        self.a2m = a2m 
        self.a2M = a2M
        self.a3m = a3m
        self.a3M = a3M

        self.model = model
        self.flag_angle = flag_angle
        self.grid = grid
        if self.spde :
            self.mesh  = buildMesh(self.grid)

            self.like = lambda : gl.logLikelihoodSPDE(self.datwork, self.model, mesh = self.mesh)
        else :
            self.like = lambda : self.model.computeLogLikelihood(self.datwork)
        self.datwork = dat
        

#Fonction pour revenir Ã&nbsp; la paramÃ©trisation de l'optimisation
#Sert Ã&nbsp; fixer les paramÃ¨tres initiaux dans la paramÃ©trisation 1
 
    def param2box(self,newparam):
        var = newparam[0] + newparam[1]
        p1  = newparam[0]/var
        
        param = [(var - self.a0m) / (self.a0M - self.a0m),
                (p1  - self.a1m) / (self.a1M - self.a1m),
                (newparam[2] - self.a2m) / (self.a2M - self.a2m),
                (newparam[3] - self.a3m) / (self.a3M - self.a3m)]
        if self.flag_angle:
            param.append(np.mod(newparam[4], 180))
        return param
         

    def box2param(self,param):
        var = param[0] * (self.a0M - self.a0m) + self.a0m
        p1  = param[1] * (self.a1M - self.a1m) + self.a1m
        newparam = [var * p1,
                    var * (1-p1),
                    param[2] * (self.a2M - self.a2m) + self.a2m,
                    param[3] * (self.a3M - self.a3m) + self.a3m]
        if self.flag_angle:
            newparam.append(param[4])
        return newparam

    def fOptim(self,param):
        newparam = self.box2param(param)
        self.datwork["error"] = 0 * self.datwork["rank"] + newparam[0]
        self.model.getCovAniso(0).setSill(newparam[1])
        self.model.getCovAniso(0).setRange(0,newparam[2])#useless in non stationary case
        self.model.getCovAniso(0).setRange(1,newparam[3])#idem
        if self.flag_angle:
            self.model.getCovAniso(0).setAnisoAngle(0,newparam[4])
        indpoly = np.where(self.grid["Polygon"])[0]
        v = self.grid["ranges0"]
        v[indpoly] = newparam[2]
        self.grid[:,"ranges0"] = v
        v = self.grid["ranges1"]
        v[indpoly] = newparam[3]
        self.grid[:,"ranges1"] = v 
        res = -self.like()
        print(newparam)
        #self.model.display()
        print(res)
        return res
    
    def optimize(self, x0 = [0.1,0.5,0.4,0.1], 
                 bounds = [(0,1),(0,1),(0,1),(0,1)]):
        return scipy.optimize.minimize(self.fOptim,x0,bounds = bounds)

    def kriging(self,out):
        if self.spde:
            gl.krigingSPDE(self.datwork,out,self.model,domain = self.grid)
        else:
            gl.kriging(self.datwork,out,self.model,gl.NeighUnique())


def postProcess(data= None,train = None):
    data["error"] = 0 * data["rank"] + train["error"][0]
    data.setLocator("error",gl.ELoc.V)
   
   
def crossval(train,test,selvario=True):
    ind = np.arange(test.getNSample())
    if selvario:
        ind = np.where(test["ThicknessSides"]!=0)[0]
    res = test["Kriging*estim"]
    rmse = np.sqrt(np.mean((res[ind]-test["ThicknessSides"][ind])**2))
    print("rmse = " + str(rmse))
    mae = np.mean(np.abs(res[ind]-test["ThicknessSides"][ind]))
    print("mae = " + str(mae))
    meantrain = np.mean(train["ThicknessSides"])
    valm = np.sqrt(np.mean((test["ThicknessSides"][ind]-meantrain)**2))
    print("rmse when predicting by the mean = " + str(valm))
    mae = np.mean(np.abs(meantrain-test["ThicknessSides"][ind]))
    print("mean when predicting by the mean = " + str(mae))
def map(grid):
    plt.figure(figsize=[20,12])
    gp.raster(grid,cmap="turbo", flagLegend = True,vmin = 0,vmax = 15)

def workflowCompute(spde = False, stationary = False,selvario = True, border = False, ratio = 0.1, flag_angle = False, nborder = 0, drift = 1):
    data = createOiseData(selvario)
    print("Data created")
    train,test = splitTrainTest(data, ratiotest = ratio)
    grid = createOiseGrid(nborder)
    model = makeOiseModel(grid,"",nborder= nborder,stationary=stationary,drift = drift)
    datawork = train
    coptim = optimLikelihood(datawork,model,grid,spde,flag_angle=flag_angle)
    x0 = [0.1,0.5,0.4,0.1]
    bounds = [(0,1),(0,1),(0,1),(0,1)]
    if flag_angle:
        x0.append(0)
        bounds.append((-720,720))
    result = coptim.optimize(x0 = x0, bounds = bounds)
    print("End optimization")
    print("Kriging on test set")
    if ratio != 0:
        coptim.kriging(test)
        postProcess(data,train)

    print("Kriging on the grid")
    if not spde:
        print("Be patient")
    coptim.kriging(grid)

    return coptim, data,grid, train, test, model


def workflowPost(train = None,test=None,grid=None,selvario = True):
    if train is not None and test is not None:
       crossval(train,test,selvario)
    if grid is not None:
        map(grid)


def buildMesh(grid):
    extendSel(grid,"Polygon","border",n=20)
    grid.setLocator("border",gl.ELoc.SEL)
    gridMesh = grid.refine([1,2])
    gridMesh.deleteColumn("border")
    gl.migrate(grid,gridMesh,"border",flag_fill=True)
    gridMesh.setName("Migrate","border")
    v = gridMesh["border"]
    v[np.isnan(gridMesh["border"])]=0
    gridMesh[:,"border"] = v
    gridMesh.setLocator("border",gl.ELoc.SEL)
    return gl.MeshETurbo.createFromGrid(gridMesh,True)
</pre></body></html>