View Javadoc

1   /*
2    * Copyright 2008 ATG Import Service Project
3    *
4    * Licensed under the Apache License, Version 2.0 (the "License");
5    * you may not use this file except in compliance with the License.
6    * You may obtain a copy of the License at
7    * 
8    *   http://www.apache.org/licenses/LICENSE-2.0
9    * 
10   * Unless required by applicable law or agreed to in writing, software
11   * distributed under the License is distributed on an "AS IS" BASIS,
12   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   * See the License for the specific language governing permissions and
14   * limitations under the License.
15   */
16  
17  package atg.adapter.gsa.xml;
18  
19  import java.io.File;
20  import java.io.FileWriter;
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.Collection;
24  import java.util.Formatter;
25  import java.util.HashMap;
26  import java.util.HashSet;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.Set;
31  
32  import atg.adapter.gsa.GSAItem;
33  import atg.adapter.gsa.GSAItemDescriptor;
34  import atg.adapter.gsa.GSAPropertyDescriptor;
35  import atg.adapter.gsa.GSARepository;
36  import atg.adapter.gsa.xml.TemplateParser;
37  import atg.adapter.util.RepositoryUtils;
38  import atg.beans.DynamicPropertyDescriptor;
39  import atg.deployment.repository.PropertyDescriptorInfo;
40  import atg.dtm.TransactionDemarcation;
41  import atg.dtm.TransactionDemarcationException;
42  import atg.epub.ImportService;
43  import atg.nucleus.Nucleus;
44  import atg.repository.ConcurrentUpdateException;
45  import atg.repository.MutableRepository;
46  import atg.repository.MutableRepositoryItem;
47  import atg.repository.Repository;
48  import atg.repository.RepositoryException;
49  import atg.repository.RepositoryItem;
50  import atg.repository.RepositoryPropertyDescriptor;
51  import atg.repository.RepositoryItemDescriptor;
52  import atg.versionmanager.WorkingContext;
53  
54  import atg.epub.ImportTimer;
55  
56  public class ImportWorkerThread extends Thread
57  {
58    private GSARepository mTargetRepository = null;
59    
60    private boolean mLogging = false;
61    private ParserContext context;
62  
63    private int mThreadIndex;
64    private ImportService mImportService;
65    private int mStartSegment;
66    private int mEndSegment;
67    private int mPhase = ImportService.PHASE_ADD_UPDATE;
68    private boolean mContinueRunning;
69    private int mErrorThresholdPercentage; 
70    
71    File mLogFile;
72    FileWriter mLogFileWriter;
73    
74    public void setPhase (int pPhase)
75    {
76      mPhase = pPhase;
77    }
78    
79    public ImportWorkerThread
80    (
81      int pThreadIndex,
82      ImportService pImportService,
83      int pStartSegment,
84      int pEndSegment
85    )
86    {
87      mThreadIndex = pThreadIndex;
88      mImportService = pImportService;
89      mStartSegment = pStartSegment;
90      mEndSegment = pEndSegment;
91      mContinueRunning = true;
92    }
93          
94    public void run()
95    {
96      if (mLogging)
97      {
98        createLogFile ();
99      }
100     
101     // Get the error threshold percentage.
102     
103     mErrorThresholdPercentage = mImportService.getErrorThresholdPerBatch();
104     
105     // Get the target repository.
106     
107     mTargetRepository = (GSARepository) mImportService.getTargetRepository();
108 
109     //logMessage ("THREAD (" + this.getName() + "): Pushing Workspace Id: " + mImportService.getWorkspace().getID());
110 
111     WorkingContext.pushDevelopmentLine (mImportService.getWorkspace());
112 
113     synchronized (this)
114     {
115       while (mContinueRunning)
116       {
117         try
118         {
119           //logMessage ("THREAD (" + this.getName() + "): Waiting for a signal to start on (" + mStartSegment + "," + mEndSegment + ")");
120           wait();
121           //logMessage ("THREAD (" + this.getName() + "): PHASE START: " + mPhase);
122         }
123         catch(InterruptedException ie)
124         {
125           //logMessage ("THREAD (" + this.getName() + "): Received the terminate interrupt");
126           mContinueRunning = false;
127 
128           // Clean up
129         }
130         
131         if (mImportService.getCancelCommandReceived())
132         {
133           mContinueRunning = false;
134         }
135 
136         if (mContinueRunning)
137         {
138           // Start the current phase.
139           
140           //logMessage ("THREAD (" + this.getName() + "): Notifying the Import Service that thread is processing");
141 
142           mImportService.notifyThreadStatus (mThreadIndex, this.getName(), ImportService.THREAD_STATUS_PROCESSING);
143           
144           processPhase ();
145           
146           // Notify the import service that we've completed this phase.
147           
148           //logMessage ("THREAD (" + this.getName() + "): PHASE COMPLETE. Notifying the import Service.");
149 
150           mImportService.notifyThreadStatus (mThreadIndex, this.getName(), ImportService.THREAD_STATUS_PHASE_COMPLETED);
151         }
152       }
153     }
154     
155     //logMessage ("THREAD (" + this.getName() + "): DONE");
156   }
157   
158   private void processPhase ()
159   {  
160     String segmentFilename;
161     File segmentFile;
162     ImportFileParser importFileParser;
163     ImportItem[] importItems;
164     ImportTimer databaseTimer = new ImportTimer (this.getName() + " DATABASE TIMER" + " (" + mPhase + ")");
165     int count = 0;
166     int total = (mEndSegment - mStartSegment) + 1;
167 
168     // Process each of the segments that have been assigned.
169     
170     for (int segmentIndex = mStartSegment; segmentIndex <= mEndSegment; segmentIndex++)
171     {
172       count++;
173       
174       if (mImportService.getCancelCommandReceived())
175       {
176         mContinueRunning = false;
177         System.out.println ("THREAD (" + this.getName() + "): Interrupted during segment processing.");
178         break;
179       }
180       
181       // Get the file to import.
182       
183       segmentFilename = ImportService.M_SEGMENT_FILE_STUB + segmentIndex + ImportService.M_SEGMENT_FILE_EXTENSION;
184       segmentFile = new File (mImportService.getImportDirectory(), segmentFilename);
185 
186       importFileParser = new ImportFileParser (mPhase, segmentFile);      
187       importItems = importFileParser.parseFile ();
188       
189       databaseTimer.start();
190       
191       if (!processItems (importItems, segmentIndex))
192       {
193         System.out.println ("THREAD (" + this.getName() + "): Batch processing failed. Logging details with parent.");
194         mImportService.logBatchFailure (segmentIndex, mPhase);
195       }
196 
197       databaseTimer.stop();
198       databaseTimer.display (" : " + count + " / " + total);
199     }
200 
201     //logMessage ("********** END PHASE ***********");
202 
203     return;
204   }
205   
206   private boolean processItems
207   (
208     ImportItem[] pImportItems,
209     int pSegmentIndex
210   )
211   {
212     boolean batchProcessed = false;
213     float errorPercentage = 0;
214     TransactionDemarcation transactionDemarcation = new TransactionDemarcation();
215     boolean rollback = true;
216     boolean deleteSuccessful = true;
217     int itemIndex = 0;
218     HashMap<Integer,ImportFailedItem> failedItems = new HashMap();
219     ImportFailedItem failedItem;
220     int successCount = 0;
221 
222     while ((!batchProcessed) && (errorPercentage < mErrorThresholdPercentage))
223     {
224       if (mImportService.getCancelCommandReceived())
225       {
226         mContinueRunning = false;
227         System.out.println ("THREAD (" + this.getName() + "): Interrupted during batch processing.");
228         break;
229       }
230       
231       try
232       {
233         transactionDemarcation.begin (mTargetRepository.getTransactionManager());
234         
235         successCount = 0;
236       
237         for (itemIndex = 0; itemIndex < pImportItems.length; itemIndex++)
238         {
239           //logMessage ("START Item: " + pImportItems[itemIndex].getItemDescriptor()
240           //                                        + " Id: " + pImportItems[itemIndex].getItemId());
241         
242           if (mImportService.getCancelCommandReceived())
243           {
244             mContinueRunning = false;
245             System.out.println ("THREAD (" + this.getName() + "): Interrupted during item processing.");
246             break;
247           }
248           
249           // Skip failed items
250           
251           if (failedItems.containsKey (Integer.valueOf (itemIndex)))
252           {
253             continue;
254           }
255 
256           switch (mPhase)
257           {
258             case ImportService.PHASE_ADD_UPDATE:
259 
260               // If the action is delete, then just set the status and move on
261               // for this phase
262             
263               if (pImportItems[itemIndex].getAction() != ImportItem.M_ACTION_DELETE)
264               {
265                 importItem (pImportItems[itemIndex]);
266               }
267             
268               break;
269             
270             case ImportService.PHASE_REFERENCE_UPDATE:
271             
272               // If the action is delete, then just set the status and move on
273               // for this phase
274             
275               if (pImportItems[itemIndex].getAction() != ImportItem.M_ACTION_DELETE)
276               {
277                 importItem (pImportItems[itemIndex]);
278               }
279             
280               break;
281             
282             case ImportService.PHASE_DELETE:
283              
284               if (pImportItems[itemIndex].getAction() == ImportItem.M_ACTION_DELETE)
285               {
286                 //logMessage ("processItems: Calling deleteItem for: " +
287                 //     pImportItems[itemIndex].getItemDescriptor() + " : " +
288                 //     pImportItems[itemIndex].getItemId()); 
289 
290                 deleteSuccessful = deleteItem (pImportItems[itemIndex]);
291               
292                 if (!deleteSuccessful)
293                 { 
294                   //logMessage ("processItems: Delete failed. Add it to the list: " +
295                   //                           pImportItems[itemIndex].getItemDescriptor() + " : " +
296                   //                           pImportItems[itemIndex].getItemId());
297                   
298                   // Add to the items. No need to log failure now as we will be retrying this.
299                   
300                   mImportService.getFailedDeletions().add (pImportItems[itemIndex]);
301                 }
302               }
303             
304               break;
305           }
306 
307           //logMessage ("END Item: " + pImportItems[itemIndex].getItemDescriptor()
308           //                              + " Id: " + pImportItems[itemIndex].getItemId());
309           
310           successCount++;
311         
312         } // End for
313       
314         if (mContinueRunning)
315         {
316           rollback = false;
317           batchProcessed = true;
318         }
319       }
320       catch (TransactionDemarcationException e)
321       {
322         e.printStackTrace();
323       }
324       catch (RepositoryException e)
325       {
326         // Add to list
327         
328         failedItem = new ImportFailedItem ();
329         
330         failedItem.setItemId(pImportItems[itemIndex].getItemId());
331         failedItem.setItemDescriptor(pImportItems[itemIndex].getItemDescriptor());
332         failedItem.setAction(pImportItems[itemIndex].getAction());
333         failedItem.setMesssage(e.getMessage());
334         failedItem.setBatchFilename (ImportService.M_SEGMENT_FILE_STUB + pSegmentIndex + ImportService.M_SEGMENT_FILE_EXTENSION);
335         
336         failedItems.put (Integer.valueOf (itemIndex), failedItem);
337         
338         errorPercentage = ((float) failedItems.size() / (float) pImportItems.length) * 100;
339       }
340       finally
341       {
342         try
343         {
344           transactionDemarcation.end (rollback);
345         }
346         catch (TransactionDemarcationException e)
347         {
348           e.printStackTrace();
349         }
350       }
351     }
352     
353     // Check if we have any errors to write.
354     
355     if (!failedItems.isEmpty())
356     {
357       for (Integer key : failedItems.keySet())
358       {
359         failedItem = (ImportFailedItem) failedItems.get(key);
360         mImportService.logFailedItem (failedItem, mPhase);
361       }
362 
363       failedItems.clear();
364       
365       // Tell the import service.
366       
367       mImportService.setDataImportErrors();
368     }
369     
370     // Return the result of the batch processing
371     
372     return (batchProcessed);
373   }
374 
375   // -------------------------------------
376   /***
377    * Imports the repository item into the target repository.
378    * 
379    * @param pImportItem the item to be imported.
380 
381    * @return true if the item was deployed, false otherwise
382    */
383   private void importItem (ImportItem pImportItem) throws RepositoryException
384   {
385     MutableRepositoryItem targetItem = null;
386 
387     if (mTargetRepository == null)
388     {
389       throw new RepositoryException ("No target repository specified");
390     }
391 
392     // Strict mode - higher performance
393     
394     if (mPhase == ImportService.PHASE_ADD_UPDATE)
395     {
396       // If action is update, either get the item, or marker that it should be created ??>
397       
398       //logMessage ("importItem: ADD UPDATE Phase");
399       
400       if (pImportItem.getAction() == ImportItem.M_ACTION_ADD)
401       {
402         //logMessage ("importItem: Action is ADD");
403         //logMessage ("Creating Item: " + pImportItem.getItemDescriptor() + " Id: " + pImportItem.getItemId());
404 
405         targetItem = mTargetRepository.createItem (pImportItem.getItemId(), pImportItem.getItemDescriptor());
406       }
407       else if (pImportItem.getAction() == ImportItem.M_ACTION_UPDATE)
408       {
409         //logMessage ("importItem: Action is UPDATE so get the item");
410 
411         targetItem = (MutableRepositoryItem) mTargetRepository.getItemForUpdate (
412                                          pImportItem.getItemId(), pImportItem.getItemDescriptor());
413         
414         if (targetItem == null)
415         {
416           //logMessage ("importItem: Didnt find the item and in strict mode so its an exception");
417           throw new RepositoryException ("Update item not found in target");
418         }
419       }
420     }
421     else
422     {
423       // Since we are not in the ADD_UPDATE phase we'll always need the target item.
424       
425       //logMessage ("importItem: Not ADD UPDATE Phase");
426 
427       //logMessage ("Retrieving Item: " + pImportItem.getItemDescriptor() + " Id: " + pImportItem.getItemId());
428       targetItem = (MutableRepositoryItem) mTargetRepository.getItemForUpdate (
429                                            pImportItem.getItemId(), pImportItem.getItemDescriptor());
430       
431       if (targetItem == null)
432       {
433         //logMessage ("importItem: Failed to get the item for Reference update. It should be there"); 
434       }      
435     }
436     
437     // Set the properties
438     
439     setProperties (mTargetRepository, targetItem, pImportItem);
440 
441     if ((mPhase == ImportService.PHASE_ADD_UPDATE) && (pImportItem.getAction() == ImportItem.M_ACTION_ADD))
442     {
443       //logMessage ("Adding Item: " + pImportItem.getItemDescriptor() + " Id: " + pImportItem.getItemId());
444       mTargetRepository.addItem (targetItem);
445     }
446     else
447     {
448       mTargetRepository.updateItem (targetItem);
449     }
450   }
451 
452   // -------------------------------------
453   /***
454    * This method attempts to delete the given item. If it fails its most likely because
455    * another item is referencing this one. It will get cleaned up in a later attempt.
456    * 
457    * @exception RepositoryException If this delete failed.
458    */
459   
460   private boolean deleteItem (ImportItem pImportItem) throws RepositoryException
461   {
462     boolean deleteSuccessful = true;
463     MutableRepositoryItem targetItem;
464     
465     if (mTargetRepository == null)
466     {
467       throw new RepositoryException ("No target repository specified");
468     }
469 
470     try
471     {
472       //logMessage ("deleteItem: Checking if deletion item exists: " + pImportItem.getItemDescriptor() +
473       //                                            " : " + pImportItem.getItemId());
474       
475       targetItem = (MutableRepositoryItem) mTargetRepository.getItem (
476                                      pImportItem.getItemId(), pImportItem.getItemDescriptor());
477 
478       if (targetItem != null)
479       {
480         //logMessage ("deleteItem: Item found. Removing references");
481 
482         RepositoryUtils.removeReferencesToItem (targetItem);
483         
484         //logMessage ("deleteItem: Removing item");
485 
486         mTargetRepository.removeItem (pImportItem.getItemId(), pImportItem.getItemDescriptor());
487 
488         //logMessage ("deleteItem: Item removed.");
489       }
490       else
491       {
492         //logMessage ("deleteItem: Item not found");
493         deleteSuccessful = false;
494       }
495     }
496     catch (RepositoryException e)
497     {
498       logMessage ("deleteItem: Got error removing item likely due to foreign reference. Will be removed later: " + e.getMessage());
499       return false;
500     }
501     catch (Throwable t)
502     {
503       logMessage ("deleteItem: Got a throwable exception: " + t.getMessage());
504       throw new RepositoryException (t);
505     }
506 
507     return deleteSuccessful;
508   }
509   
510   private void setProperties
511   (
512     GSARepository current,
513     MutableRepositoryItem item,
514     ImportItem pImportItem
515   ) throws RepositoryException
516   {
517     ParserContext context = new ParserContext();
518     HashMap<String, String> properties;
519     String valueString;
520     RepositoryPropertyDescriptor repositoryPropertyDescriptor = null;
521     boolean setProperty = true;
522     boolean generatedProperty = false;
523     RepositoryItemDescriptor referencedItemDescriptor = null;
524     Object value = null;
525 
526     // Set property values
527     // Only set non-required reference properties on PASS_REFERENCE_UPDATE
528 
529     properties = pImportItem.getProperties();
530 
531     for (String key : properties.keySet())
532     {
533       valueString = properties.get (key);
534       
535       repositoryPropertyDescriptor =
536         (RepositoryPropertyDescriptor) item.getItemDescriptor().getPropertyDescriptor(key);
537   
538       if (repositoryPropertyDescriptor == null)
539       {
540         throw new RepositoryException (
541         "Cannot get property descriptor for property " + key + " on item "
542             + item.getItemDescriptor().getItemDescriptorName());
543       }
544       
545       setProperty = true;
546       generatedProperty = false;
547 
548       if (mPhase == ImportService.PHASE_ADD_UPDATE)
549       {
550         // If it's required we must set it.
551         
552         if (repositoryPropertyDescriptor.isRequired())
553         {
554           setProperty = true;
555 
556           // If its a required reference we must generate a dummy value.
557           
558           if (repositoryPropertyDescriptor.getPropertyType() == RepositoryItem.class)
559           {
560             referencedItemDescriptor = repositoryPropertyDescriptor.getPropertyItemDescriptor();
561             
562             value = mImportService.getReferenceItemGenerator().findDummyReferenceItem (
563                                                      (MutableRepository) referencedItemDescriptor.getRepository(),
564                                                      referencedItemDescriptor.getItemDescriptorName(),
565                                                      (GSAPropertyDescriptor) repositoryPropertyDescriptor);
566 
567             generatedProperty = true;
568           }
569         }
570         else if (repositoryPropertyDescriptor.isIdProperty())
571         {
572           setProperty = false;
573         }
574         else if (repositoryPropertyDescriptor.getPropertyType() == RepositoryItem.class)
575         {
576           setProperty = false;
577         }
578         else if (repositoryPropertyDescriptor.isMultiValued() &&
579                            repositoryPropertyDescriptor.getComponentPropertyType() == RepositoryItem.class)
580         {
581           setProperty = false;
582         }
583       }
584       else if (mPhase == ImportService.PHASE_REFERENCE_UPDATE)
585       {
586         if (repositoryPropertyDescriptor.getPropertyType() == RepositoryItem.class)
587         {
588           setProperty = true;
589         }
590         else if (repositoryPropertyDescriptor.isMultiValued() &&
591                              repositoryPropertyDescriptor.getComponentPropertyType() == RepositoryItem.class)
592         {
593           setProperty = true;
594         }
595         else
596         {
597           setProperty = false;
598         }
599       }
600       else if (mPhase == ImportService.PHASE_DELETE)
601       {
602         setProperty = false;
603       }
604       else
605       {
606         setProperty = false;
607       }
608   
609       if (setProperty)
610       {
611         // Check for a generated value.
612 
613         if (!generatedProperty)
614         {
615           // Check that the xml string value is not null or empty.
616           
617           if ((valueString != null) &&
618             (!valueString.equalsIgnoreCase("null")) &&
619             (!valueString.equalsIgnoreCase("")))
620           {
621             // Convert the string to its object form.
622             
623             value = TemplateParser.convertStringToValue (
624                          current, context, pImportItem.getItemDescriptor(), key, valueString, item);
625           }
626           else
627           {
628             value = null;
629           }
630         }
631    
632         // Set the property.
633         
634         if (value != null)
635         {
636           item.setPropertyValue(key, value);
637         }
638       }
639     }
640   }
641 
642   private void displayImportItem (ImportItem pImportItem)
643   {
644     String propertyName;
645     String propertyValue;
646 
647     System.out.println ("THREAD (" + this.getName() + "): Property: Item Id Value: " + pImportItem.getItemId());    
648     System.out.println ("THREAD (" + this.getName() + "): Property: Item Descriptor Value: " + pImportItem.getItemDescriptor());    
649     System.out.println ("THREAD (" + this.getName() + "): Property: Action Value: " + pImportItem.getAction());    
650 
651     HashMap properties = pImportItem.getProperties();
652     Iterator iterator = properties.keySet().iterator();
653     
654     while (iterator.hasNext())
655     {
656       propertyName = (String) iterator.next();      
657       propertyValue = (String) properties.get (propertyName);
658 
659       System.out.println ("THREAD (" + this.getName() + "): Property: " + propertyName + " Value: " + propertyValue);
660     }    
661   }
662  
663   private void createLogFile ()
664   {
665     String logFilename = "C://Bol//BolImportData//logs//log_" + this.getName() + ".log";
666     
667     try
668     {
669       mLogFile = new File (logFilename);
670       mLogFile.createNewFile();
671       mLogFileWriter = new FileWriter (mLogFile, false);
672     }
673     catch (IOException e)
674     {
675       System.out.println ("IOException: " + e.getMessage());
676     }    
677   }
678 
679   private void logMessage (String pMessage)
680   {
681     if (mLogging)
682     {
683       try
684       {
685         mLogFileWriter.write (pMessage + "\n");
686         mLogFileWriter.flush();
687       }
688       catch (IOException e)
689       {   
690       }
691     }
692   }
693   
694   private void logImportItem (ImportItem pImportItem)
695   {
696     logMessage ("ITEM: Id->" + pImportItem.getItemId());
697     logMessage ("ITEM: Item Descriptor->" + pImportItem.getItemDescriptor());
698     logMessage ("ITEM: Action->" + pImportItem.getAction());
699 
700     Set properties = pImportItem.getProperties().keySet();
701     Iterator iterator = properties.iterator();
702     
703     while (iterator.hasNext())
704     {
705       String key = (String) iterator.next();
706       logMessage ("ITEM: " + key + "->" + pImportItem.getProperties().get(key));
707     }
708   }
709 }